From a4abbe78612ad390695576d41df991927cb4b9b9 Mon Sep 17 00:00:00 2001 From: spalger Date: Wed, 24 Feb 2016 01:02:51 -0800 Subject: [PATCH 1/4] [courier/fetch] decouple client & dataSource from fetch Currently the courier only fetches requests that wrap a dataSource object and use the es client. In order to support requests from more generic sources we need to break these pieces up. With this change the courier request objects no longer wrap a dataSource. To prevent changing too much existing behavior the AbstractRequest class was broken up into the DataSourceRequest and AbstractRequest classes. Any logic that depended on DataSource objects was split into the DataSourceRequest and the AbstractRequest now just manages the state and timing information for requests. To allow for creating more diverse request types, the doc and search types were renamed to EsDocRequest and EsSearchRequest. The strategies were also made more generic (they only need a single #execute() method now) and were similarly renamed to EsDocStrategy and EsSearchStrategy. Both of these strategies inherit from the EsAbstractStrategy class which implements their execute method and uses the subclasses to get #reqsFetchParamsToBody() and #getResponses() methods. --- src/ui/public/courier/courier.js | 6 +- .../public/courier/data_source/doc_source.js | 12 +- .../courier/data_source/search_source.js | 16 +- src/ui/public/courier/fetch/call_client.js | 125 ----------- .../courier/fetch/call_response_handlers.js | 18 +- src/ui/public/courier/fetch/fetch_these.js | 4 +- src/ui/public/courier/fetch/is_request.js | 2 +- .../courier/fetch/merge_duplicate_requests.js | 29 --- .../courier/fetch/request/error_handler.js | 29 --- .../public/courier/fetch/request/request.js | 114 ---------- src/ui/public/courier/fetch/request/search.js | 19 -- .../{fetch => fetch_types}/__tests__/doc.js | 0 .../__tests__/es_search_strategy.js} | 0 .../__tests__/segmented.js | 0 .../__tests__/segmented_create_queue.js | 0 .../__tests__/segmented_index_selection.js | 0 .../__tests__/segmented_size_picking.js | 0 .../courier/fetch_types/abstract_request.js | 172 +++++++++++++++ .../fetch_types/data_source_request.js | 144 +++++++++++++ .../fetch_types/es_abstract_strategy.js | 203 ++++++++++++++++++ .../doc.js => fetch_types/es_doc_request.js} | 18 +- .../doc.js => fetch_types/es_doc_strategy.js} | 19 +- .../courier/fetch_types/es_search_request.js | 19 ++ .../es_search_strategy.js} | 33 +-- .../es_segmented_request.js} | 12 +- .../es_segmented_request_handle.js} | 8 +- src/ui/public/courier/looper/doc.js | 6 +- src/ui/public/courier/looper/search.js | 6 +- 28 files changed, 617 insertions(+), 397 deletions(-) delete mode 100644 src/ui/public/courier/fetch/call_client.js delete mode 100644 src/ui/public/courier/fetch/merge_duplicate_requests.js delete mode 100644 src/ui/public/courier/fetch/request/error_handler.js delete mode 100644 src/ui/public/courier/fetch/request/request.js delete mode 100644 src/ui/public/courier/fetch/request/search.js rename src/ui/public/courier/{fetch => fetch_types}/__tests__/doc.js (100%) rename src/ui/public/courier/{fetch/strategy/__tests__/search.js => fetch_types/__tests__/es_search_strategy.js} (100%) rename src/ui/public/courier/{fetch/request => fetch_types}/__tests__/segmented.js (100%) rename src/ui/public/courier/{fetch/request => fetch_types}/__tests__/segmented_create_queue.js (100%) rename src/ui/public/courier/{fetch/request => fetch_types}/__tests__/segmented_index_selection.js (100%) rename src/ui/public/courier/{fetch/request => fetch_types}/__tests__/segmented_size_picking.js (100%) create mode 100644 src/ui/public/courier/fetch_types/abstract_request.js create mode 100644 src/ui/public/courier/fetch_types/data_source_request.js create mode 100644 src/ui/public/courier/fetch_types/es_abstract_strategy.js rename src/ui/public/courier/{fetch/request/doc.js => fetch_types/es_doc_request.js} (67%) rename src/ui/public/courier/{fetch/strategy/doc.js => fetch_types/es_doc_strategy.js} (55%) create mode 100644 src/ui/public/courier/fetch_types/es_search_request.js rename src/ui/public/courier/{fetch/strategy/search.js => fetch_types/es_search_strategy.js} (70%) rename src/ui/public/courier/{fetch/request/segmented.js => fetch_types/es_segmented_request.js} (96%) rename src/ui/public/courier/{fetch/request/segmented_handle.js => fetch_types/es_segmented_request_handle.js} (82%) diff --git a/src/ui/public/courier/courier.js b/src/ui/public/courier/courier.js index 486d5bcb20e88..e8e5e34a5b4e5 100644 --- a/src/ui/public/courier/courier.js +++ b/src/ui/public/courier/courier.js @@ -10,7 +10,7 @@ import Notifier from 'ui/notify/notifier'; import DocSourceProvider from './data_source/doc_source'; import SearchSourceProvider from './data_source/search_source'; -import SearchStrategyProvider from './fetch/strategy/search'; +import EsSearchStrategyProvider from './fetch_types/es_search_strategy'; import RequestQueueProvider from './_request_queue'; import ErrorHandlersProvider from './_error_handlers'; import FetchProvider from './fetch'; @@ -28,7 +28,7 @@ uiModules.get('kibana/courier') var DocSource = Private(DocSourceProvider); var SearchSource = Private(SearchSourceProvider); - var searchStrategy = Private(SearchStrategyProvider); + var esSearchStrategy = Private(EsSearchStrategyProvider); var requestQueue = Private(RequestQueueProvider); var errorHandlers = Private(ErrorHandlersProvider); @@ -75,7 +75,7 @@ uiModules.get('kibana/courier') * individual errors are routed to their respective requests. */ self.fetch = function () { - fetch.fetchQueued(searchStrategy).then(function () { + fetch.fetchQueued(esSearchStrategy).then(function () { searchLooper.restart(); }); }; diff --git a/src/ui/public/courier/data_source/doc_source.js b/src/ui/public/courier/data_source/doc_source.js index c2725309bfb7e..c4f39cae01dea 100644 --- a/src/ui/public/courier/data_source/doc_source.js +++ b/src/ui/public/courier/data_source/doc_source.js @@ -5,18 +5,18 @@ import 'ui/storage'; import DocSendToEsProvider from './_doc_send_to_es'; import AbstractDataSourceProvider from './_abstract'; -import DocRequestProvider from '../fetch/request/doc'; -import DocStrategyProvider from '../fetch/strategy/doc'; +import EsDocRequestProvider from '../fetch_types/es_doc_request'; +import EsDocStrategyProvider from '../fetch_types/es_doc_strategy'; export default function DocSourceFactory(Private, Promise, es, sessionStorage) { var sendToEs = Private(DocSendToEsProvider); var SourceAbstract = Private(AbstractDataSourceProvider); - var DocRequest = Private(DocRequestProvider); - var docStrategy = Private(DocStrategyProvider); + var EsDocRequest = Private(EsDocRequestProvider); + var esDocStrategy = Private(EsDocStrategyProvider); _.class(DocSource).inherits(SourceAbstract); function DocSource(initialState) { - DocSource.Super.call(this, initialState, docStrategy); + DocSource.Super.call(this, initialState, esDocStrategy); } DocSource.prototype.onUpdate = SourceAbstract.prototype.onResults; @@ -27,7 +27,7 @@ export default function DocSourceFactory(Private, Promise, es, sessionStorage) { *****/ DocSource.prototype._createRequest = function (defer) { - return new DocRequest(this, defer); + return new EsDocRequest(this, defer); }; /** diff --git a/src/ui/public/courier/data_source/search_source.js b/src/ui/public/courier/data_source/search_source.js index 0cd2a14d749be..72a459ceac9e0 100644 --- a/src/ui/public/courier/data_source/search_source.js +++ b/src/ui/public/courier/data_source/search_source.js @@ -1,22 +1,22 @@ import _ from 'lodash'; -import NormalizeSortRequestProvider from './_normalize_sort_request'; import rootSearchSource from './_root_search_source'; import AbstractDataSourceProvider from './_abstract'; -import SearchRequestProvider from '../fetch/request/search'; -import SegmentedRequestProvider from '../fetch/request/segmented'; -import SearchStrategyProvider from '../fetch/strategy/search'; +import EsSearchRequestProvider from '../fetch_types/es_search_request'; +import EsSegmentedRequestProvider from '../fetch_types/es_segmented_request'; +import EsSearchStrategyProvider from '../fetch_types/es_search_strategy'; +import NormalizeSortRequestProvider from './_normalize_sort_request'; export default function SearchSourceFactory(Promise, Private) { var SourceAbstract = Private(AbstractDataSourceProvider); - var SearchRequest = Private(SearchRequestProvider); - var SegmentedRequest = Private(SegmentedRequestProvider); - var searchStrategy = Private(SearchStrategyProvider); + var SearchRequest = Private(EsSearchRequestProvider); + var SegmentedRequest = Private(EsSegmentedRequestProvider); + var esSearchStrategy = Private(EsSearchStrategyProvider); var normalizeSortRequest = Private(NormalizeSortRequestProvider); _.class(SearchSource).inherits(SourceAbstract); function SearchSource(initialState) { - SearchSource.Super.call(this, initialState, searchStrategy); + SearchSource.Super.call(this, initialState, esSearchStrategy); } /***** diff --git a/src/ui/public/courier/fetch/call_client.js b/src/ui/public/courier/fetch/call_client.js deleted file mode 100644 index 2157e4420d958..0000000000000 --- a/src/ui/public/courier/fetch/call_client.js +++ /dev/null @@ -1,125 +0,0 @@ -import _ from 'lodash'; - -import IsRequestProvider from './is_request'; -import MergeDuplicatesRequestProvider from './merge_duplicate_requests'; -import ReqStatusProvider from './req_status'; - -export default function CourierFetchCallClient(Private, Promise, es, esShardTimeout, sessionId) { - - const isRequest = Private(IsRequestProvider); - const mergeDuplicateRequests = Private(MergeDuplicatesRequestProvider); - - const ABORTED = Private(ReqStatusProvider).ABORTED; - const DUPLICATE = Private(ReqStatusProvider).DUPLICATE; - - function callClient(strategy, requests) { - // merging docs can change status to DUPLICATE, capture new statuses - const statuses = mergeDuplicateRequests(requests); - - // get the actual list of requests that we will be fetching - const executable = statuses.filter(isRequest); - let execCount = executable.length; - - // resolved by respond() - let esPromise; - const defer = Promise.defer(); - - // for each respond with either the response or ABORTED - const respond = function (responses) { - responses = responses || []; - return Promise.map(requests, function (req, i) { - switch (statuses[i]) { - case ABORTED: - return ABORTED; - case DUPLICATE: - return req._uniq.resp; - default: - return responses[_.findIndex(executable, req)]; - } - }) - .then( - (res) => defer.resolve(res), - (err) => defer.reject(err) - ); - }; - - - // handle a request being aborted while being fetched - const requestWasAborted = Promise.method(function (req, i) { - if (statuses[i] === ABORTED) { - defer.reject(new Error('Request was aborted twice?')); - } - - execCount -= 1; - if (execCount > 0) { - // the multi-request still contains other requests - return; - } - - if (esPromise && _.isFunction(esPromise.abort)) { - esPromise.abort(); - } - - esPromise = ABORTED; - - return respond(); - }); - - - // attach abort handlers, close over request index - statuses.forEach(function (req, i) { - if (!isRequest(req)) return; - req.whenAborted(function () { - requestWasAborted(req, i).catch(defer.reject); - }); - }); - - - // Now that all of THAT^^^ is out of the way, lets actually - // call out to elasticsearch - Promise.map(executable, function (req) { - return Promise.try(req.getFetchParams, void 0, req) - .then(function (fetchParams) { - return (req.fetchParams = fetchParams); - }); - }) - .then(function (reqsFetchParams) { - return strategy.reqsFetchParamsToBody(reqsFetchParams); - }) - .then(function (body) { - // while the strategy was converting, our request was aborted - if (esPromise === ABORTED) { - throw ABORTED; - } - - return (esPromise = es[strategy.clientMethod]({ - timeout: esShardTimeout, - ignore_unavailable: true, - preference: sessionId, - body: body - })); - }) - .then(function (clientResp) { - return strategy.getResponses(clientResp); - }) - .then(respond) - .catch(function (err) { - if (err === ABORTED) respond(); - else defer.reject(err); - }); - - // return our promise, but catch any errors we create and - // send them to the requests - return defer.promise - .catch(function (err) { - requests.forEach(function (req, i) { - if (statuses[i] !== ABORTED) { - req.handleFailure(err); - } - }); - }); - - } - - return callClient; -}; diff --git a/src/ui/public/courier/fetch/call_response_handlers.js b/src/ui/public/courier/fetch/call_response_handlers.js index 41b4c2bc20205..10b6813fef79a 100644 --- a/src/ui/public/courier/fetch/call_response_handlers.js +++ b/src/ui/public/courier/fetch/call_response_handlers.js @@ -1,4 +1,6 @@ -import { RequestFailure, SearchTimeout, ShardFailure } from 'ui/errors'; +import { RequestFailure } from 'ui/errors'; + +import 'ui/promises'; import ReqStatusProvider from './req_status'; import NotifierProvider from './notifier'; @@ -15,15 +17,7 @@ export default function CourierFetchCallResponseHandlers(Private, Promise) { return ABORTED; } - let resp = responses[i]; - - if (resp.timed_out) { - notify.warning(new SearchTimeout()); - } - - if (resp._shards && resp._shards.failed) { - notify.warning(new ShardFailure(resp)); - } + const resp = responses[i]; function progress() { if (req.isIncomplete()) { @@ -43,10 +37,6 @@ export default function CourierFetchCallResponseHandlers(Private, Promise) { } return Promise.try(function () { - return req.transformResponse(resp); - }) - .then(function () { - resp = arguments[0]; return req.handleResponse(resp); }) .then(progress); diff --git a/src/ui/public/courier/fetch/fetch_these.js b/src/ui/public/courier/fetch/fetch_these.js index 78affdf29ac53..b5b72086593ca 100644 --- a/src/ui/public/courier/fetch/fetch_these.js +++ b/src/ui/public/courier/fetch/fetch_these.js @@ -1,6 +1,5 @@ import NotifierProvider from './notifier'; import ForEachStrategyProvider from './for_each_strategy'; -import CallClientProvider from './call_client'; import CallResponseHandlersProvider from './call_response_handlers'; import ContinueIncompleteProvider from './continue_incomplete'; import ReqStatusProvider from './req_status'; @@ -10,7 +9,6 @@ export default function FetchTheseProvider(Private, Promise) { const forEachStrategy = Private(ForEachStrategyProvider); // core tasks - const callClient = Private(CallClientProvider); const callResponseHandlers = Private(CallResponseHandlersProvider); const continueIncomplete = Private(ContinueIncompleteProvider); @@ -36,7 +34,7 @@ export default function FetchTheseProvider(Private, Promise) { return startRequests(requests) .then(function () { - return callClient(strategy, requests); + return strategy.execute(requests); }) .then(function (responses) { return callResponseHandlers(requests, responses); diff --git a/src/ui/public/courier/fetch/is_request.js b/src/ui/public/courier/fetch/is_request.js index f64ab1e1ec08d..2b93077d13fd8 100644 --- a/src/ui/public/courier/fetch/is_request.js +++ b/src/ui/public/courier/fetch/is_request.js @@ -1,4 +1,4 @@ -import AbstractRequestProvider from './request'; +import AbstractRequestProvider from '../fetch_types/abstract_request'; export default function IsRequestProvider(Private) { const AbstractRequest = Private(AbstractRequestProvider); diff --git a/src/ui/public/courier/fetch/merge_duplicate_requests.js b/src/ui/public/courier/fetch/merge_duplicate_requests.js deleted file mode 100644 index 5c698f422834b..0000000000000 --- a/src/ui/public/courier/fetch/merge_duplicate_requests.js +++ /dev/null @@ -1,29 +0,0 @@ -import IsRequestProvider from './is_request'; -import ReqStatusProvider from './req_status'; - -export default function FetchMergeDuplicateRequests(Private) { - const isRequest = Private(IsRequestProvider); - const DUPLICATE = Private(ReqStatusProvider).DUPLICATE; - - function mergeDuplicateRequests(requests) { - // dedupe requests - const index = {}; - return requests.map(function (req) { - if (!isRequest(req)) return req; - - const iid = req.source._instanceid; - if (!index[iid]) { - // this request is unique so far - index[iid] = req; - // keep the request - return req; - } - - // the source was requested at least twice - req._uniq = index[iid]; - return DUPLICATE; - }); - } - - return mergeDuplicateRequests; -}; diff --git a/src/ui/public/courier/fetch/request/error_handler.js b/src/ui/public/courier/fetch/request/error_handler.js deleted file mode 100644 index 91e6c0929df39..0000000000000 --- a/src/ui/public/courier/fetch/request/error_handler.js +++ /dev/null @@ -1,29 +0,0 @@ -import Notifier from 'ui/notify/notifier'; - -import ErrorHandlersProvider from '../../_error_handlers'; - -export default function RequestErrorHandlerFactory(Private) { - const errHandlers = Private(ErrorHandlersProvider); - - const notify = new Notifier({ - location: 'Courier Fetch Error' - }); - - function handleError(req, error) { - const myHandlers = []; - - errHandlers.splice(0).forEach(function (handler) { - (handler.source === req.source ? myHandlers : errHandlers).push(handler); - }); - - if (!myHandlers.length) { - notify.fatal(new Error(`unhandled courier request error: ${ notify.describeError(error) }`)); - } else { - myHandlers.forEach(function (handler) { - handler.defer.resolve(error); - }); - } - } - - return handleError; -}; diff --git a/src/ui/public/courier/fetch/request/request.js b/src/ui/public/courier/fetch/request/request.js deleted file mode 100644 index c360cf5dd94e9..0000000000000 --- a/src/ui/public/courier/fetch/request/request.js +++ /dev/null @@ -1,114 +0,0 @@ -import _ from 'lodash'; -import moment from 'moment'; - -import errors from 'ui/errors'; - -import RequestQueueProvider from '../../_request_queue'; -import ErrorHandlerRequestProvider from './error_handler'; - -export default function AbstractReqProvider(Private, Promise) { - const requestQueue = Private(RequestQueueProvider); - const requestErrorHandler = Private(ErrorHandlerRequestProvider); - - const onStop = Symbol('onStopMethod'); - const whenAbortedHandlers = Symbol('whenAbortedHandlers'); - - return class AbstractReq { - constructor(source, defer) { - this.source = source; - this.defer = defer || Promise.defer(); - this[whenAbortedHandlers] = []; - - requestQueue.push(this); - } - - canStart() { - return Boolean(!this.stopped && !this.source._fetchDisabled); - } - - start() { - if (this.started) { - throw new TypeError('Unable to start request because it has already started'); - } - - this.started = true; - this.moment = moment(); - - const source = this.source; - if (source.activeFetchCount) { - source.activeFetchCount += 1; - } else { - source.activeFetchCount = 1; - } - - source.history = [this]; - } - - getFetchParams() { - return this.source._flatten(); - } - - transformResponse(resp) { - return resp; - } - - filterError(resp) { - return false; - } - - handleResponse(resp) { - this.success = true; - this.resp = resp; - } - - handleFailure(error) { - this.success = false; - this.resp = error && error.resp; - this.retry(); - return requestErrorHandler(this, error); - } - - isIncomplete() { - return false; - } - - continue() { - throw new Error('Unable to continue ' + this.type + ' request'); - } - - retry() { - const clone = this.clone(); - this.abort(); - return clone; - } - - [onStop]() { - if (this.stopped) return; - - this.stopped = true; - this.source.activeFetchCount -= 1; - _.pull(requestQueue, this); - } - - abort() { - this[onStop](); - this.defer = null; - this.aborted = true; - _.callEach(this[whenAbortedHandlers]); - } - - whenAborted(cb) { - this[whenAbortedHandlers].push(cb); - } - - completefunction() { - this[onStop](); - this.ms = this.moment.diff() * -1; - this.defer.resolve(this.resp); - } - - clone() { - return new this.constructor(this.source, this.defer); - } - }; -}; diff --git a/src/ui/public/courier/fetch/request/search.js b/src/ui/public/courier/fetch/request/search.js deleted file mode 100644 index 3556a8aee9c42..0000000000000 --- a/src/ui/public/courier/fetch/request/search.js +++ /dev/null @@ -1,19 +0,0 @@ -import _ from 'lodash'; - -import SearchStrategyProvider from '../strategy/search'; -import AbstractRequestProvider from './request'; - -export default function SearchReqProvider(Private) { - - const searchStrategy = Private(SearchStrategyProvider); - const AbstractRequest = Private(AbstractRequestProvider); - - return class SearchReq extends AbstractRequest { - constructor(...args) { - super(...args); - - this.type = 'search'; - this.strategy = searchStrategy; - } - }; -}; diff --git a/src/ui/public/courier/fetch/__tests__/doc.js b/src/ui/public/courier/fetch_types/__tests__/doc.js similarity index 100% rename from src/ui/public/courier/fetch/__tests__/doc.js rename to src/ui/public/courier/fetch_types/__tests__/doc.js diff --git a/src/ui/public/courier/fetch/strategy/__tests__/search.js b/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js similarity index 100% rename from src/ui/public/courier/fetch/strategy/__tests__/search.js rename to src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented.js b/src/ui/public/courier/fetch_types/__tests__/segmented.js similarity index 100% rename from src/ui/public/courier/fetch/request/__tests__/segmented.js rename to src/ui/public/courier/fetch_types/__tests__/segmented.js diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented_create_queue.js b/src/ui/public/courier/fetch_types/__tests__/segmented_create_queue.js similarity index 100% rename from src/ui/public/courier/fetch/request/__tests__/segmented_create_queue.js rename to src/ui/public/courier/fetch_types/__tests__/segmented_create_queue.js diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented_index_selection.js b/src/ui/public/courier/fetch_types/__tests__/segmented_index_selection.js similarity index 100% rename from src/ui/public/courier/fetch/request/__tests__/segmented_index_selection.js rename to src/ui/public/courier/fetch_types/__tests__/segmented_index_selection.js diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented_size_picking.js b/src/ui/public/courier/fetch_types/__tests__/segmented_size_picking.js similarity index 100% rename from src/ui/public/courier/fetch/request/__tests__/segmented_size_picking.js rename to src/ui/public/courier/fetch_types/__tests__/segmented_size_picking.js diff --git a/src/ui/public/courier/fetch_types/abstract_request.js b/src/ui/public/courier/fetch_types/abstract_request.js new file mode 100644 index 0000000000000..8daa30fca80ba --- /dev/null +++ b/src/ui/public/courier/fetch_types/abstract_request.js @@ -0,0 +1,172 @@ +import _ from 'lodash'; +import moment from 'moment'; + +import errors from 'ui/errors'; + +import RequestQueueProvider from '../_request_queue'; + +export default function AbstractRequestProvider(Private, Promise) { + const requestQueue = Private(RequestQueueProvider); + + const onStop = Symbol('onStopMethod'); + const whenAbortedHandlers = Symbol('whenAbortedHandlers'); + + return class AbstractRequest { + /** + * Initialize the request and add it to the couriers request + * queue so that the request will be considered for execution + * by the courier + * + * @constructor + */ + constructor() { + if (this.constructor === AbstractRequest) { + throw new Error('AbstractRequest should not be constructed directly'); + } + + this[whenAbortedHandlers] = []; + requestQueue.push(this); + } + + /** + * Determine if the request can be started + * @return {Boolean} + */ + canStart() { + return Boolean(!this.stopped); + } + + /** + * Mark the request as started + * + * @return {undefined} + */ + start() { + if (this.started) { + throw new TypeError('Unable to start request because it has already started'); + } + + this.started = true; + this.moment = moment(); + } + + /** + * Determine if a response should be considered an error + * + * @return {Boolean} + */ + filterError(resp) { + return false; + } + + /** + * Mark the request as successful + * + * @return {undefined} + */ + handleResponse(resp) { + this.success = true; + } + + /** + * Mark the request as failed + * + * @return {undefined} + */ + handleFailure(error) { + this.success = false; + } + + /** + * Check to see if this request could be continued + * + * @return {Boolean} + */ + isIncomplete() { + return false; + } + + /** + * Mark this request as continued, or in this case + * throw an error because requests are not continuable + * by default + * + * @return {undefined} + */ + continue() { + throw new Error('Unable to continue ' + this.type + ' request'); + } + + /** + * Retry this request by aborting the current request + * and reentering the request in the request queue + * + * @return {AbstractRequest} + */ + retry() { + const clone = this.clone(); + this.abort(); + return clone; + } + + /** + * Mark the request as stopped and remove it from + * the request queue. Used by both #abort() and + * #complete(). + * + * @private + * @return {undefined} + */ + [onStop]() { + if (this.stopped) return; + + this.stopped = true; + _.pull(requestQueue, this); + } + + /** + * Mark the request as aborted and remove it from the + * request queue. + * + * @return {undefined} + */ + abort() { + this[onStop](); + this.aborted = true; + _.callEach(this[whenAbortedHandlers]); + } + + /** + * Add a function that should be called when this + * request is aborted. + * + * @param {Function} cb - the function to call + * @return {undefined} + */ + whenAborted(cb) { + this[whenAbortedHandlers].push(cb); + } + + /** + * Mark this request as complete and remove it from + * the request queue. + * + * @return {undefined} + */ + complete() { + this[onStop](); + this.ms = this.moment.diff() * -1; + } + + /** + * Create a clone of this request. Used when retrying a + * request. Causes the new request to be added to the + * request queue. + * + * @return {AbstractRequest} - the clone + */ + clone() { + return new this.constructor(); + } + }; +}; diff --git a/src/ui/public/courier/fetch_types/data_source_request.js b/src/ui/public/courier/fetch_types/data_source_request.js new file mode 100644 index 0000000000000..e4c93e51661a2 --- /dev/null +++ b/src/ui/public/courier/fetch_types/data_source_request.js @@ -0,0 +1,144 @@ +import notify from 'ui/notify'; + +import AbstractRequestProvider from './abstract_request'; +import ErrorHandlersProvider from '../_error_handlers'; + +export default function DataSourceRequestProvider(Private, Promise) { + const errorHandlers = Private(ErrorHandlersProvider); + const AbstractRequest = Private(AbstractRequestProvider); + + return class DataSourceRequest extends AbstractRequest { + + /** + * Overriden to track the source for this request + * + * @override + * @constructor + */ + constructor(source, defer) { + super(); + + this.defer = defer || Promise.defer(); + this.source = source; + } + + /** + * Overriden to consult wether the source has fetch disabled for + * some reason + * + * @override + * @return {Boolean} + */ + canStart() { + return Boolean(super.canStart() && !this.source._fetchDisabled); + } + + /** + * Read the data source to produce fetch params for the elastisearch client + * + * @return {Object} - request parameters for the es client + */ + getFetchParams() { + return this.source._flatten(); + } + + /** + * Overridden to track the active request count on the search source + * + * @override + * @return {undefined} + */ + start() { + super.start(); + + const source = this.source; + if (source.activeFetchCount) { + source.activeFetchCount += 1; + } else { + source.activeFetchCount = 1; + } + + source.history = [this]; + } + + /** + * Overridden to track the active request count on the + * search source and to dereference the defer + * + * @override + * @return {undefined} + */ + abort() { + if (this.stopped) return; + + super.abort(); + this.defer = null; + this.source.activeFetchCount -= 1; + } + + /** + * Overridden to track the response, which will + * later be sent to the defer + * + * @override + * @return {undefined} + */ + handleResponse(resp) { + super.handleResponse(); + this.resp = resp; + } + + /** + * Overridden to track the response from the error, which will + * later be sent to the defer + * + * @override + * @return {undefined} + */ + handleFailure(error) { + super.handleFailure(); + + this.resp = error && error.resp; + const ownHandlers = []; + const otherHandlers = []; + errorHandlers.forEach(handler => { + const list = handler.source === this.source ? ownHandlers : otherHandlers; + list.push(handler); + }); + + // remove all handlers from errorHandlers and replace + // them with just the list of other listeners + errorHandlers.splice(0, errorHandlers.length, otherHandlers); + + if (!ownHandlers.length) { + notify.fatal(new Error(`unhandled courier request error: ${notify.describeError(error) }`)); + } else { + ownHandlers.forEach(handler => handler.defer.resolve(error)); + } + + this.retry(); + } + + /** + * Overridden to send response to the defer + * + * @override + * @return {undefined} + */ + complete() { + super.complete(); + this.source.activeFetchCount -= 1; + this.defer.resolve(this.resp); + } + + /** + * Overridden to inject the source argument when cloned + * + * @override + * @return {undefined} + */ + clone() { + return new this.constructor(this.source, this.defer); + } + }; +}; diff --git a/src/ui/public/courier/fetch_types/es_abstract_strategy.js b/src/ui/public/courier/fetch_types/es_abstract_strategy.js new file mode 100644 index 0000000000000..54b2654158441 --- /dev/null +++ b/src/ui/public/courier/fetch_types/es_abstract_strategy.js @@ -0,0 +1,203 @@ +import _ from 'lodash'; + +import { SearchTimeout, ShardFailure } from 'ui/errors'; + +import IsRequestProvider from '../fetch/is_request'; +import ReqStatusProvider from '../fetch/req_status'; +import NotifierProvider from '../fetch/notifier'; +import AbstractStrategyProvider from './abstract_strategy'; + +export default function EsClientExecutorProvider(Private, Promise, es, esShardTimeout, sessionId) { + const notify = Private(NotifierProvider); + const isRequest = Private(IsRequestProvider); + const AbstractStrategy = Private(AbstractStrategyProvider); + + const { ABORTED, DUPLICATE } = Private(ReqStatusProvider); + + return class EsAbstractStrategy extends AbstractStrategy { + constructor() { + super(); + if (this.constructor === EsAbstractStrategy) { + throw new Error('EsAbstractStrategy should not be constructed directly'); + } + } + + /** + * Convert the list of requests into the body which should be sent to + * elasticsearch. + * + * @return {Any} + */ + reqsFetchParamsToBody() { + throw new Error('this method must be overriden by subclassses of EsAbstractStrategy'); + } + + /** + * Convert the response from elasticsearch into an array of ordered responses + * which matches the request order. + * + * @return {Array[Any]} + */ + getResponses() { + throw new Error('this method must be overriden by subclassses of EsAbstractStrategy'); + } + + /** + * Combine any requests that are the same, so that they are + * only executed in elasticsearch once. + * + * Duplicate values are converted into DUPLICATE status + * objects, and deduplcilated on response + * + * @private + * @param {Array[Requests]} requests + * @return {Array[Requests]} + */ + mergeDuplicateRequests(requests) { + // dedupe requests + const index = {}; + return requests.map(function (req) { + if (!isRequest(req)) return req; + + const iid = req.source._instanceid; + if (!index[iid]) { + // this request is unique so far + index[iid] = req; + // keep the request + return req; + } + + // the source was requested at least twice + req._uniq = index[iid]; + return DUPLICATE; + }); + } + + /** + * Called by the courier when requests with this strategy must + * be executed. + * + * @param {Array[Request]} requests + * @return {Promise} + */ + execute(requests) { + // merging docs can change status to DUPLICATE, capture new statuses + const statuses = this.mergeDuplicateRequests(requests); + + // get the actual list of requests that we will be fetching + const executable = statuses.filter(isRequest); + let execCount = executable.length; + + // resolved by respond() + let esPromise; + const defer = Promise.defer(); + + const warnAboutEsErrors = (resp) => { + if (resp.timed_out) { + notify.warning(new SearchTimeout()); + } + + if (resp._shards && resp._shards.failed) { + notify.warning(new ShardFailure(resp)); + } + }; + + // for each respond with either the response or ABORTED + const respond = (responses) => { + responses = responses || []; + return Promise.map(requests, (req, i) => { + switch (statuses[i]) { + case ABORTED: + return ABORTED; + case DUPLICATE: + return req._uniq.resp; + default: + const response = responses[_.findIndex(executable, req)]; + warnAboutEsErrors(response); + return response; + } + }) + .then( + (res) => defer.resolve(res), + (err) => defer.reject(err) + ); + }; + + // handle a request being aborted while being fetched + const requestWasAborted = Promise.method((req, i) => { + if (statuses[i] === ABORTED) { + defer.reject(new Error('Request was aborted twice?')); + } + + execCount -= 1; + if (execCount > 0) { + // the multi-request still contains other requests + return; + } + + if (esPromise && _.isFunction(esPromise.abort)) { + esPromise.abort(); + } + + esPromise = ABORTED; + + return respond(); + }); + + + // attach abort handlers, close over request index + statuses.forEach((req, i) => { + if (!isRequest(req)) return; + req.whenAborted(() => { + requestWasAborted(req, i).catch(defer.reject); + }); + }); + + + // Now that all of THAT^^^ is out of the way, lets actually + // call out to elasticsearch + Promise.map(executable, (req) => { + return Promise.try(req.getFetchParams, void 0, req) + .then((fetchParams) => { + return (req.fetchParams = fetchParams); + }); + }) + .then((reqsFetchParams) => { + return this.reqsFetchParamsToBody(reqsFetchParams); + }) + .then((body) => { + // while the reqsFetchParamsToBody was converting, our request may have been aborted + if (esPromise === ABORTED) { + throw ABORTED; + } + + return (esPromise = es[this.clientMethod]({ + timeout: esShardTimeout, + ignore_unavailable: true, + preference: sessionId, + body: body + })); + }) + .then((clientResp) => { + return this.getResponses(clientResp); + }) + .then(respond) + .catch((err) => { + if (err === ABORTED) respond(); + else defer.reject(err); + }); + + // return our promise, but catch any errors we create and + // send them to the requests + return defer.promise + .catch((err) => { + requests.forEach((req, i) => { + if (statuses[i] !== ABORTED) { + req.handleFailure(err); + } + }); + }); + } + }; + +} diff --git a/src/ui/public/courier/fetch/request/doc.js b/src/ui/public/courier/fetch_types/es_doc_request.js similarity index 67% rename from src/ui/public/courier/fetch/request/doc.js rename to src/ui/public/courier/fetch_types/es_doc_request.js index ac41d141c8cdd..1655539c240f2 100644 --- a/src/ui/public/courier/fetch/request/doc.js +++ b/src/ui/public/courier/fetch_types/es_doc_request.js @@ -1,19 +1,19 @@ import _ from 'lodash'; -import DocStrategyProvider from '../strategy/doc'; -import AbstractRequestProvider from './request'; +import EsDocStrategyProvider from './es_doc_strategy'; +import DataSourceRequestProvider from './data_source_request'; export default function DocRequestProvider(Private) { - const docStrategy = Private(DocStrategyProvider); - const AbstractRequest = Private(AbstractRequestProvider); + const esDocStrategy = Private(EsDocStrategyProvider); + const DataSourceRequest = Private(DataSourceRequestProvider); - class DocRequest extends AbstractRequest { + return class EsDocRequest extends DataSourceRequest { constructor(...args) { super(...args); - this.type = 'doc'; - this.strategy = docStrategy; + this.type = 'es_doc'; + this.strategy = esDocStrategy; } canStart() { @@ -39,7 +39,5 @@ export default function DocRequestProvider(Private) { return super.handleResponse(resp); } - } - - return DocRequest; + }; }; diff --git a/src/ui/public/courier/fetch/strategy/doc.js b/src/ui/public/courier/fetch_types/es_doc_strategy.js similarity index 55% rename from src/ui/public/courier/fetch/strategy/doc.js rename to src/ui/public/courier/fetch_types/es_doc_strategy.js index 86a46af90ca8a..f4ba86e31d32c 100644 --- a/src/ui/public/courier/fetch/strategy/doc.js +++ b/src/ui/public/courier/fetch_types/es_doc_strategy.js @@ -1,24 +1,31 @@ -export default function FetchStrategyForDoc(Promise) { - return { - clientMethod: 'mget', +import EsAbstractStrategyProvider from './es_abstract_strategy'; + +export default function FetchStrategyForDoc(Private, Promise) { + const EsAbstractStrategy = Private(EsAbstractStrategyProvider); + + return new class EsDocStrategy extends EsAbstractStrategy { + constructor() { + super(); + this.clientMethod = 'mget'; + } /** * Flatten a series of requests into as ES request body * @param {array} requests - an array of flattened requests * @return {Promise} - a promise that is fulfilled by the request body */ - reqsFetchParamsToBody: function (reqsFetchParams) { + reqsFetchParamsToBody(reqsFetchParams) { return Promise.resolve({ docs: reqsFetchParams }); - }, + } /** * Fetch the multiple responses from the ES Response * @param {object} resp - The response sent from Elasticsearch * @return {array} - the list of responses */ - getResponses: function (resp) { + getResponses(resp) { return resp.docs; } }; diff --git a/src/ui/public/courier/fetch_types/es_search_request.js b/src/ui/public/courier/fetch_types/es_search_request.js new file mode 100644 index 0000000000000..87bc92a60e699 --- /dev/null +++ b/src/ui/public/courier/fetch_types/es_search_request.js @@ -0,0 +1,19 @@ +import _ from 'lodash'; + +import EsSearchStrategyProvider from './es_search_strategy'; +import DataSourceRequestProvider from './data_source_request'; + +export default function SearchReqProvider(Private) { + + const esSearchStrategy = Private(EsSearchStrategyProvider); + const DataSourceRequest = Private(DataSourceRequestProvider); + + return class EsSearchReq extends DataSourceRequest { + constructor(...args) { + super(...args); + + this.type = 'es_search'; + this.strategy = esSearchStrategy; + } + }; +}; diff --git a/src/ui/public/courier/fetch/strategy/search.js b/src/ui/public/courier/fetch_types/es_search_strategy.js similarity index 70% rename from src/ui/public/courier/fetch/strategy/search.js rename to src/ui/public/courier/fetch_types/es_search_strategy.js index 2abda799343b6..6222e9af29331 100644 --- a/src/ui/public/courier/fetch/strategy/search.js +++ b/src/ui/public/courier/fetch_types/es_search_strategy.js @@ -2,19 +2,24 @@ import _ from 'lodash'; import angular from 'angular'; import { toJson } from 'ui/utils/aggressive_parse'; +import EsAbstractStrategyProvider from './es_abstract_strategy'; export default function FetchStrategyForSearch(Private, Promise, timefilter) { + const EsAbstractStrategy = Private(EsAbstractStrategyProvider); - return { - clientMethod: 'msearch', + return new class EsSearchStrategy extends EsAbstractStrategy { + constructor() { + super(); + this.clientMethod = 'msearch'; + } /** - * Flatten a series of requests into as ES request body - * - * @param {array} requests - the requests to serialize - * @return {Promise} - a promise that is fulfilled by the request body - */ - reqsFetchParamsToBody: function (reqsFetchParams) { + * Flatten a series of requests into as ES request body + * + * @param {array} requests - the requests to serialize + * @return {Promise} - a promise that is fulfilled by the request body + */ + reqsFetchParamsToBody(reqsFetchParams) { return Promise.map(reqsFetchParams, function (fetchParams) { return Promise.resolve(fetchParams.index) .then(function (indexList) { @@ -50,14 +55,14 @@ export default function FetchStrategyForSearch(Private, Promise, timefilter) { .then(function (requests) { return requests.join('\n') + '\n'; }); - }, + } /** - * Fetch the multiple responses from the ES Response - * @param {object} resp - The response sent from Elasticsearch - * @return {array} - the list of responses - */ - getResponses: function (resp) { + * Fetch the multiple responses from the ES Response + * @param {object} resp - The response sent from Elasticsearch + * @return {array} - the list of responses + */ + getResponses(resp) { return resp.responses; } }; diff --git a/src/ui/public/courier/fetch/request/segmented.js b/src/ui/public/courier/fetch_types/es_segmented_request.js similarity index 96% rename from src/ui/public/courier/fetch/request/segmented.js rename to src/ui/public/courier/fetch_types/es_segmented_request.js index 71ae5e2fa8b8d..3d87f3592c52a 100644 --- a/src/ui/public/courier/fetch/request/segmented.js +++ b/src/ui/public/courier/fetch_types/es_segmented_request.js @@ -3,18 +3,18 @@ import { isNumber } from 'lodash'; import Notifier from 'ui/notify/notifier'; -import SearchRequestProvider from './search'; -import SegmentedHandleProvider from './segmented_handle'; +import EsSearchRequestProvider from './es_search_request'; +import EsSegmentedRequestHandleProvider from './es_segmented_request_handle'; export default function SegmentedReqProvider(es, Private, Promise, timefilter, config) { - const SearchReq = Private(SearchRequestProvider); - const SegmentedHandle = Private(SegmentedHandleProvider); + const EsSearchReq = Private(EsSearchRequestProvider); + const EsSegmentedRequestHandle = Private(EsSegmentedRequestHandleProvider); const notify = new Notifier({ location: 'Segmented Fetch' }); - class SegmentedReq extends SearchReq { + class SegmentedReq extends EsSearchReq { constructor(source, defer, initFn) { super(source, defer); @@ -28,7 +28,7 @@ export default function SegmentedReqProvider(es, Private, Promise, timefilter, c this._direction = 'desc'; this._sortFn = null; this._queueCreated = false; - this._handle = new SegmentedHandle(this); + this._handle = new EsSegmentedRequestHandle(this); this._hitWindow = null; diff --git a/src/ui/public/courier/fetch/request/segmented_handle.js b/src/ui/public/courier/fetch_types/es_segmented_request_handle.js similarity index 82% rename from src/ui/public/courier/fetch/request/segmented_handle.js rename to src/ui/public/courier/fetch_types/es_segmented_request_handle.js index b4dda1c70a49f..811267e835b1c 100644 --- a/src/ui/public/courier/fetch/request/segmented_handle.js +++ b/src/ui/public/courier/fetch_types/es_segmented_request_handle.js @@ -24,19 +24,19 @@ export default function CourierSegmentedReqHandle(Private) { } setDirection(...args) { - this[segmentedRequest](...args); + this[segmentedRequest].setDirection(...args); } setSize(...args) { - this[segmentedRequest](...args); + this[segmentedRequest].setSize(...args); } setMaxSegments(...args) { - this[segmentedRequest](...args); + this[segmentedRequest].setMaxSegments(...args); } setSortFn(...args) { - this[segmentedRequest](...args); + this[segmentedRequest].setSortFn(...args); } }; }; diff --git a/src/ui/public/courier/looper/doc.js b/src/ui/public/courier/looper/doc.js index a1c6104ce20aa..f6f7ac5698da3 100644 --- a/src/ui/public/courier/looper/doc.js +++ b/src/ui/public/courier/looper/doc.js @@ -1,18 +1,18 @@ import FetchProvider from '../fetch'; import LooperProvider from './_looper'; -import DocStrategyProvider from '../fetch/strategy/doc'; +import EsDocStrategyProvider from '../fetch_types/es_doc_strategy'; export default function DocLooperService(Private) { var fetch = Private(FetchProvider); var Looper = Private(LooperProvider); - var DocStrategy = Private(DocStrategyProvider); + var esDocStrategy = Private(EsDocStrategyProvider); /** * The Looper which will manage the doc fetch interval * @type {Looper} */ var docLooper = new Looper(1500, function () { - fetch.fetchQueued(DocStrategy); + fetch.fetchQueued(esDocStrategy); }); return docLooper; diff --git a/src/ui/public/courier/looper/search.js b/src/ui/public/courier/looper/search.js index cb3914c601c0e..cd324aae07988 100644 --- a/src/ui/public/courier/looper/search.js +++ b/src/ui/public/courier/looper/search.js @@ -1,11 +1,11 @@ import FetchProvider from '../fetch'; -import SearchStrategyProvider from '../fetch/strategy/search'; +import EsSearchStrategyProvider from '../fetch_types/es_search_strategy'; import RequestQueueProvider from '../_request_queue'; import LooperProvider from './_looper'; export default function SearchLooperService(Private, Promise, Notifier, $rootScope) { var fetch = Private(FetchProvider); - var searchStrategy = Private(SearchStrategyProvider); + var esSearchStrategy = Private(EsSearchStrategyProvider); var requestQueue = Private(RequestQueueProvider); var Looper = Private(LooperProvider); @@ -18,7 +18,7 @@ export default function SearchLooperService(Private, Promise, Notifier, $rootSco var searchLooper = new Looper(null, function () { $rootScope.$broadcast('courier:searchRefresh'); return fetch.these( - requestQueue.getInactive(searchStrategy) + requestQueue.getInactive(esSearchStrategy) ); }); From b27fcafc55576fd9317fa513b703913165992467 Mon Sep 17 00:00:00 2001 From: spalger Date: Wed, 24 Feb 2016 09:01:13 -0800 Subject: [PATCH 2/4] [courier/fetch] handle es errors in the es_abstract_strategy --- .../public/courier/fetch/call_response_handlers.js | 14 ++++---------- .../fetch_types/__tests__/es_search_strategy.js | 2 +- .../public/courier/fetch_types/abstract_request.js | 9 --------- .../courier/fetch_types/es_abstract_strategy.js | 14 +++++++++----- .../courier/fetch_types/es_segmented_request.js | 4 +++- 5 files changed, 17 insertions(+), 26 deletions(-) diff --git a/src/ui/public/courier/fetch/call_response_handlers.js b/src/ui/public/courier/fetch/call_response_handlers.js index 10b6813fef79a..c9723c9ad0dfc 100644 --- a/src/ui/public/courier/fetch/call_response_handlers.js +++ b/src/ui/public/courier/fetch/call_response_handlers.js @@ -1,5 +1,3 @@ -import { RequestFailure } from 'ui/errors'; - import 'ui/promises'; import ReqStatusProvider from './req_status'; @@ -28,16 +26,12 @@ export default function CourierFetchCallResponseHandlers(Private, Promise) { return resp; } - if (resp.error) { - if (req.filterError(resp)) { - return progress(); + return Promise.try(function () { + if (resp instanceof Error) { + return req.handleFailure(resp); } else { - return req.handleFailure(new RequestFailure(null, resp)); + return req.handleResponse(resp); } - } - - return Promise.try(function () { - return req.handleResponse(resp); }) .then(progress); }); diff --git a/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js b/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js index 21d62a2bbb419..c7a7125bc1dfe 100644 --- a/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js +++ b/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js @@ -5,7 +5,7 @@ import ngMock from 'ngMock'; import SearchStrategyProvider from '../search'; -describe('ui/courier/fetch/strategy/search', () => { +describe('ui/courier es_search_strategy', () => { let Promise; let $rootScope; diff --git a/src/ui/public/courier/fetch_types/abstract_request.js b/src/ui/public/courier/fetch_types/abstract_request.js index 8daa30fca80ba..c7011ba7d7179 100644 --- a/src/ui/public/courier/fetch_types/abstract_request.js +++ b/src/ui/public/courier/fetch_types/abstract_request.js @@ -50,15 +50,6 @@ export default function AbstractRequestProvider(Private, Promise) { this.moment = moment(); } - /** - * Determine if a response should be considered an error - * - * @return {Boolean} - */ - filterError(resp) { - return false; - } - /** * Mark the request as successful * diff --git a/src/ui/public/courier/fetch_types/es_abstract_strategy.js b/src/ui/public/courier/fetch_types/es_abstract_strategy.js index 54b2654158441..043332c6eb7b2 100644 --- a/src/ui/public/courier/fetch_types/es_abstract_strategy.js +++ b/src/ui/public/courier/fetch_types/es_abstract_strategy.js @@ -1,6 +1,6 @@ import _ from 'lodash'; -import { SearchTimeout, ShardFailure } from 'ui/errors'; +import { RequestFailure, SearchTimeout, ShardFailure } from 'ui/errors'; import IsRequestProvider from '../fetch/is_request'; import ReqStatusProvider from '../fetch/req_status'; @@ -92,7 +92,11 @@ export default function EsClientExecutorProvider(Private, Promise, es, esShardTi let esPromise; const defer = Promise.defer(); - const warnAboutEsErrors = (resp) => { + const checkForEsError = (req, resp) => { + if (resp.error && req.filterError && !req.filterError(resp)) { + return new RequestFailure(null, resp); + } + if (resp.timed_out) { notify.warning(new SearchTimeout()); } @@ -100,6 +104,8 @@ export default function EsClientExecutorProvider(Private, Promise, es, esShardTi if (resp._shards && resp._shards.failed) { notify.warning(new ShardFailure(resp)); } + + return resp; }; // for each respond with either the response or ABORTED @@ -112,9 +118,7 @@ export default function EsClientExecutorProvider(Private, Promise, es, esShardTi case DUPLICATE: return req._uniq.resp; default: - const response = responses[_.findIndex(executable, req)]; - warnAboutEsErrors(response); - return response; + return checkForEsError(req, responses[_.findIndex(executable, req)]); } }) .then( diff --git a/src/ui/public/courier/fetch_types/es_segmented_request.js b/src/ui/public/courier/fetch_types/es_segmented_request.js index 3d87f3592c52a..d8a095a8c5af7 100644 --- a/src/ui/public/courier/fetch_types/es_segmented_request.js +++ b/src/ui/public/courier/fetch_types/es_segmented_request.js @@ -98,7 +98,9 @@ export default function SegmentedReqProvider(es, Private, Promise, timefilter, c } handleResponse(resp) { - return this._consumeSegment(resp); + if (!resp.error) { + return this._consumeSegment(resp); + } } filterError(resp) { From 1a947a1d11594ff4fd2bddbc3d721fde01cfc265 Mon Sep 17 00:00:00 2001 From: spalger Date: Wed, 24 Feb 2016 10:07:31 -0800 Subject: [PATCH 3/4] [courier/strategy] add an AbstractStrategy --- src/ui/public/courier/fetch_types/abstract_strategy.js | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 src/ui/public/courier/fetch_types/abstract_strategy.js diff --git a/src/ui/public/courier/fetch_types/abstract_strategy.js b/src/ui/public/courier/fetch_types/abstract_strategy.js new file mode 100644 index 0000000000000..097d55e25741b --- /dev/null +++ b/src/ui/public/courier/fetch_types/abstract_strategy.js @@ -0,0 +1,7 @@ +export default function AbstractStrategyProvider() { + return class AbstractStrategy { + exectute() { + throw new Error('Subclasses of AbstractStrategy must override the execute() method.'); + } + }; +} From 8994b39189a6d47ae0bd6dacdd7b882f8c08c386 Mon Sep 17 00:00:00 2001 From: spalger Date: Wed, 24 Feb 2016 10:49:47 -0800 Subject: [PATCH 4/4] [courier] update tests to match new changes --- .../public/courier/__tests__/requestQueue.js | 52 +++++++++---------- .../courier/fetch/__tests__/fetch_these.js | 20 ++++--- .../courier/fetch_types/__tests__/doc.js | 18 +++---- .../__tests__/es_search_strategy.js | 16 +++--- .../{segmented.js => es_segmented_request.js} | 22 ++++---- .../__tests__/segmented_create_queue.js | 14 ++--- .../__tests__/segmented_index_selection.js | 12 ++--- .../__tests__/segmented_size_picking.js | 12 ++--- 8 files changed, 85 insertions(+), 81 deletions(-) rename src/ui/public/courier/fetch_types/__tests__/{segmented.js => es_segmented_request.js} (66%) diff --git a/src/ui/public/courier/__tests__/requestQueue.js b/src/ui/public/courier/__tests__/requestQueue.js index 723f0f016ccd4..29ebfb11a9ce3 100644 --- a/src/ui/public/courier/__tests__/requestQueue.js +++ b/src/ui/public/courier/__tests__/requestQueue.js @@ -3,19 +3,19 @@ import expect from 'expect.js'; import sinon from 'auto-release-sinon'; import RequestQueueProv from '../_request_queue'; -import SearchStrategyProv from '../fetch/strategy/search'; -import DocStrategyProv from '../fetch/strategy/doc'; +import EsDocStrategyProv from '../fetch_types/es_doc_strategy'; +import EsSearchStrategyProv from '../fetch_types/es_search_strategy'; describe('Courier Request Queue', function () { - let docStrategy; let requestQueue; - let searchStrategy; + let esDocStrategy; + let esSearchStrategy; beforeEach(ngMock.module('kibana')); beforeEach(ngMock.inject(function (Private) { - docStrategy = Private(DocStrategyProv); requestQueue = Private(RequestQueueProv); - searchStrategy = Private(SearchStrategyProv); + esDocStrategy = Private(EsDocStrategyProv); + esSearchStrategy = Private(EsSearchStrategyProv); })); class MockReq { @@ -29,20 +29,20 @@ describe('Courier Request Queue', function () { describe('#getStartable(strategy)', function () { it('only returns requests that match one of the passed strategies', function () { requestQueue.push( - new MockReq(docStrategy), - new MockReq(searchStrategy), - new MockReq(searchStrategy), - new MockReq(searchStrategy) + new MockReq(esDocStrategy), + new MockReq(esSearchStrategy), + new MockReq(esSearchStrategy), + new MockReq(esSearchStrategy) ); - expect(requestQueue.getStartable(docStrategy)).to.have.length(1); - expect(requestQueue.getStartable(searchStrategy)).to.have.length(3); + expect(requestQueue.getStartable(esDocStrategy)).to.have.length(1); + expect(requestQueue.getStartable(esSearchStrategy)).to.have.length(3); }); it('returns all requests when no strategy passed', function () { requestQueue.push( - new MockReq(docStrategy), - new MockReq(searchStrategy) + new MockReq(esDocStrategy), + new MockReq(esSearchStrategy) ); expect(requestQueue.getStartable()).to.have.length(2); @@ -50,8 +50,8 @@ describe('Courier Request Queue', function () { it('returns only startable requests', function () { requestQueue.push( - new MockReq(docStrategy, true), - new MockReq(searchStrategy, false) + new MockReq(esDocStrategy, true), + new MockReq(esSearchStrategy, false) ); expect(requestQueue.getStartable()).to.have.length(1); @@ -61,20 +61,20 @@ describe('Courier Request Queue', function () { describe('#get(strategy)', function () { it('only returns requests that match one of the passed strategies', function () { requestQueue.push( - new MockReq(docStrategy), - new MockReq(searchStrategy), - new MockReq(searchStrategy), - new MockReq(searchStrategy) + new MockReq(esDocStrategy), + new MockReq(esSearchStrategy), + new MockReq(esSearchStrategy), + new MockReq(esSearchStrategy) ); - expect(requestQueue.get(docStrategy)).to.have.length(1); - expect(requestQueue.get(searchStrategy)).to.have.length(3); + expect(requestQueue.get(esDocStrategy)).to.have.length(1); + expect(requestQueue.get(esSearchStrategy)).to.have.length(3); }); it('returns all requests when no strategy passed', function () { requestQueue.push( - new MockReq(docStrategy), - new MockReq(searchStrategy) + new MockReq(esDocStrategy), + new MockReq(esSearchStrategy) ); expect(requestQueue.get()).to.have.length(2); @@ -82,8 +82,8 @@ describe('Courier Request Queue', function () { it('returns startable and not-startable requests', function () { requestQueue.push( - new MockReq(docStrategy, true), - new MockReq(searchStrategy, false) + new MockReq(esDocStrategy, true), + new MockReq(esSearchStrategy, false) ); expect(requestQueue.get()).to.have.length(2); diff --git a/src/ui/public/courier/fetch/__tests__/fetch_these.js b/src/ui/public/courier/fetch/__tests__/fetch_these.js index 456a9ec0e9ff1..6ce97ae0631da 100644 --- a/src/ui/public/courier/fetch/__tests__/fetch_these.js +++ b/src/ui/public/courier/fetch/__tests__/fetch_these.js @@ -3,9 +3,11 @@ import sinon from 'auto-release-sinon'; import expect from 'expect.js'; import ngMock from 'ngMock'; -import FetchTheseProvider from '../fetch/fetch_these'; +import FetchTheseProvider from '../fetch_these'; +import ContinueIncompleteProvider from '../continue_incomplete'; +import CallResponseHandlersProvider from '../call_response_handlers'; -describe('ui/courier/fetch/_fetch_these', () => { +describe('ui/courier - fetch_these', () => { let Promise; let $rootScope; @@ -24,9 +26,8 @@ describe('ui/courier/fetch/_fetch_these', () => { return fakeResponses; } - PrivateProvider.swap(require('ui/courier/fetch/_call_client'), FakeResponsesProvider); - PrivateProvider.swap(require('ui/courier/fetch/_call_response_handlers'), FakeResponsesProvider); - PrivateProvider.swap(require('ui/courier/fetch/_continue_incomplete'), FakeResponsesProvider); + PrivateProvider.swap(CallResponseHandlersProvider, FakeResponsesProvider); + PrivateProvider.swap(ContinueIncompleteProvider, FakeResponsesProvider); })); beforeEach(ngMock.inject((Private, $injector) => { @@ -53,7 +54,7 @@ describe('ui/courier/fetch/_fetch_these', () => { expect(request.start.callCount).to.be(1); expect(fakeResponses.callCount).to.be(0); $rootScope.$apply(); - expect(fakeResponses.callCount).to.be(3); + expect(fakeResponses.callCount).to.be(2); }); it('invokes request failure handler if starting fails', () => { @@ -77,7 +78,7 @@ describe('ui/courier/fetch/_fetch_these', () => { expect(request.continue.callCount).to.be(1); expect(fakeResponses.callCount).to.be(0); $rootScope.$apply(); - expect(fakeResponses.callCount).to.be(3); + expect(fakeResponses.callCount).to.be(2); }); it('invokes request failure handler if continuing fails', () => { request.continue = sinon.stub().returns(Promise.reject('some error')); @@ -89,7 +90,10 @@ describe('ui/courier/fetch/_fetch_these', () => { function mockRequest() { return { - strategy: 'mock', + strategy: { + type: 'mock', + execute() {} + }, started: true, aborted: false, handleFailure: sinon.spy(), diff --git a/src/ui/public/courier/fetch_types/__tests__/doc.js b/src/ui/public/courier/fetch_types/__tests__/doc.js index 6ce8059c14ba1..dc8f53f7898db 100644 --- a/src/ui/public/courier/fetch_types/__tests__/doc.js +++ b/src/ui/public/courier/fetch_types/__tests__/doc.js @@ -2,10 +2,10 @@ import sinon from 'auto-release-sinon'; import expect from 'expect.js'; import ngMock from 'ngMock'; -import DocSourceProvider from '../../data_source/doc_source'; -import DocRequestProvider from '../request/doc'; +import EsDocSourceProvider from '../../data_source/doc_source'; +import EsDocRequestProvider from '../es_doc_request'; -describe('Courier DocFetchRequest class', function () { +describe('Courier EsDocFetchRequest class', function () { let storage; let source; let defer; @@ -15,8 +15,8 @@ describe('Courier DocFetchRequest class', function () { beforeEach(ngMock.module('kibana')); beforeEach(ngMock.inject(function (Private, Promise, $injector) { - const DocSource = Private(DocSourceProvider); - const DocFetchRequest = Private(DocRequestProvider); + const EsDocSource = Private(EsDocSourceProvider); + const EsDocFetchRequest = Private(EsDocRequestProvider); storage = $injector.get('localStorage').store = @@ -27,22 +27,22 @@ describe('Courier DocFetchRequest class', function () { clear: sinon.stub() }; - source = new DocSource({}) + source = new EsDocSource({}) .set('index', 'doc-index') .set('type', 'doc-type') .set('id', 'doc-id'); defer = Promise.defer(); - req = new DocFetchRequest(source, defer); + req = new EsDocFetchRequest(source, defer); /** * Setup the version numbers for tests. There are two versions for the * purposes of these tests. * - * @param {number} mine - the version that the DocSource most + * @param {number} mine - the version that the EsDocSource most * recently received from elasticsearch. - * @param {number} theirs - the version that other DocSources have + * @param {number} theirs - the version that other EsDocSources have * received from elasticsearfch. */ setVersion = function (mine, theirs) { diff --git a/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js b/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js index c7a7125bc1dfe..75714ddb962fe 100644 --- a/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js +++ b/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js @@ -3,21 +3,21 @@ import sinon from 'auto-release-sinon'; import expect from 'expect.js'; import ngMock from 'ngMock'; -import SearchStrategyProvider from '../search'; +import EsSearchStrategyProvider from '../es_search_strategy'; describe('ui/courier es_search_strategy', () => { let Promise; let $rootScope; - let search; let reqsFetchParams; + let esSearchStrategy; beforeEach(ngMock.module('kibana')); beforeEach(ngMock.inject((Private, $injector) => { Promise = $injector.get('Promise'); $rootScope = $injector.get('$rootScope'); - search = Private(SearchStrategyProvider); + esSearchStrategy = Private(EsSearchStrategyProvider); reqsFetchParams = [ { index: ['logstash-123'], @@ -30,14 +30,14 @@ describe('ui/courier es_search_strategy', () => { describe('#clientMethod', () => { it('is msearch', () => { - expect(search.clientMethod).to.equal('msearch'); + expect(esSearchStrategy.clientMethod).to.equal('msearch'); }); }); describe('#reqsFetchParamsToBody()', () => { it('filters out any body properties that begin with $', () => { let value; - search.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); + esSearchStrategy.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); $rootScope.$apply(); expect(_.includes(value, 'foo')).to.be(true); expect(_.includes(value, '$foo')).to.be(false); @@ -46,7 +46,7 @@ describe('ui/courier es_search_strategy', () => { context('when indexList is not empty', () => { it('includes the index', () => { let value; - search.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); + esSearchStrategy.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); $rootScope.$apply(); expect(_.includes(value, '"index":["logstash-123"]')).to.be(true); }); @@ -57,7 +57,7 @@ describe('ui/courier es_search_strategy', () => { it('queries .kibana-devnull instead', () => { let value; - search.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); + esSearchStrategy.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); $rootScope.$apply(); expect(_.includes(value, '"index":[".kibana-devnull"]')).to.be(true); }); @@ -67,7 +67,7 @@ describe('ui/courier es_search_strategy', () => { describe('#getResponses()', () => { it('returns the `responses` property of the given arg', () => { const responses = [{}]; - const returned = search.getResponses({ responses }); + const returned = esSearchStrategy.getResponses({ responses }); expect(returned).to.be(responses); }); }); diff --git a/src/ui/public/courier/fetch_types/__tests__/segmented.js b/src/ui/public/courier/fetch_types/__tests__/es_segmented_request.js similarity index 66% rename from src/ui/public/courier/fetch_types/__tests__/segmented.js rename to src/ui/public/courier/fetch_types/__tests__/es_segmented_request.js index cabfae475a296..748c78a3065fb 100644 --- a/src/ui/public/courier/fetch_types/__tests__/segmented.js +++ b/src/ui/public/courier/fetch_types/__tests__/es_segmented_request.js @@ -2,30 +2,30 @@ import sinon from 'auto-release-sinon'; import expect from 'expect.js'; import ngMock from 'ngMock'; -import SegmentedRequestProvider from '../segmented'; -import SearchRequestProvider from '../search'; +import EsSegmentedRequestProvider from '../es_segmented_request'; +import EsSearchRequestProvider from '../es_search_request'; describe('ui/courier/fetch/request/segmented', () => { let Promise; let $rootScope; - let SegmentedReq; - let segmentedReq; - let searchReqStart; + let EsSegmentedReq; + let esSegmentedReq; + let esSearchReqStart; beforeEach(ngMock.module('kibana')); beforeEach(ngMock.inject((Private, $injector) => { Promise = $injector.get('Promise'); $rootScope = $injector.get('$rootScope'); - SegmentedReq = Private(SegmentedRequestProvider); - searchReqStart = sinon.spy(Private(SearchRequestProvider).prototype, 'start'); + EsSegmentedReq = Private(EsSegmentedRequestProvider); + esSearchReqStart = sinon.spy(Private(EsSearchRequestProvider).prototype, 'start'); })); describe('#start()', () => { let returned; beforeEach(() => { init(); - returned = segmentedReq.start(); + returned = esSegmentedReq.start(); }); it('returns promise', () => { @@ -33,14 +33,14 @@ describe('ui/courier/fetch/request/segmented', () => { }); it('does not call super.start() until promise is resolved', () => { - expect(searchReqStart.called).to.be(false); + expect(esSearchReqStart.called).to.be(false); $rootScope.$apply(); - expect(searchReqStart.called).to.be(true); + expect(esSearchReqStart.called).to.be(true); }); }); function init() { - segmentedReq = new SegmentedReq(mockSource()); + esSegmentedReq = new EsSegmentedReq(mockSource()); } function mockSource() { diff --git a/src/ui/public/courier/fetch_types/__tests__/segmented_create_queue.js b/src/ui/public/courier/fetch_types/__tests__/segmented_create_queue.js index e7a20f988090b..55004ddf9e391 100644 --- a/src/ui/public/courier/fetch_types/__tests__/segmented_create_queue.js +++ b/src/ui/public/courier/fetch_types/__tests__/segmented_create_queue.js @@ -4,14 +4,14 @@ import ngMock from 'ngMock'; import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source'; -import SegmentedRequestProvider from '../segmented'; +import EsSegmentedRequestProvider from '../es_segmented_request'; describe('ui/courier/fetch/request/segmented/_createQueue', () => { let Promise; - let $rootScope; - let SegmentedReq; let MockSource; + let $rootScope; + let EsSegmentedReq; require('testUtils/noDigestPromises').activateForSuite(); @@ -19,7 +19,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { beforeEach(ngMock.inject((Private, $injector) => { Promise = $injector.get('Promise'); $rootScope = $injector.get('$rootScope'); - SegmentedReq = Private(SegmentedRequestProvider); + EsSegmentedReq = Private(EsSegmentedRequestProvider); MockSource = class { constructor() { @@ -29,7 +29,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { })); it('manages the req._queueCreated flag', async function () { - const req = new SegmentedReq(new MockSource()); + const req = new EsSegmentedReq(new MockSource()); req._queueCreated = null; const promise = req._createQueue(); @@ -44,7 +44,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { const indices = [1,2,3]; sinon.stub(ip, 'toDetailedIndexList').returns(Promise.resolve(indices)); - const req = new SegmentedReq(source); + const req = new EsSegmentedReq(source); const output = await req._createQueue(); expect(output).to.equal(indices); }); @@ -52,7 +52,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { it('tells the index pattern its direction', async function () { const source = new MockSource(); const ip = source.get('index'); - const req = new SegmentedReq(source); + const req = new EsSegmentedReq(source); sinon.stub(ip, 'toDetailedIndexList').returns(Promise.resolve([1,2,3])); req.setDirection('asc'); diff --git a/src/ui/public/courier/fetch_types/__tests__/segmented_index_selection.js b/src/ui/public/courier/fetch_types/__tests__/segmented_index_selection.js index 89a2c1735a95c..c9810d3762bed 100644 --- a/src/ui/public/courier/fetch_types/__tests__/segmented_index_selection.js +++ b/src/ui/public/courier/fetch_types/__tests__/segmented_index_selection.js @@ -7,14 +7,14 @@ import HitSortFnProv from 'plugins/kibana/discover/_hit_sort_fn'; import NoDigestPromises from 'testUtils/noDigestPromises'; import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source'; -import SegmentedRequestProvider from '../segmented'; +import EsSegmentedRequestProvider from '../es_segmented_request'; describe('Segmented Request Index Selection', function () { let Promise; + let HitSortFn; let $rootScope; - let SegmentedReq; let MockSource; - let HitSortFn; + let EsSegmentedReq; NoDigestPromises.activateForSuite(); @@ -23,7 +23,7 @@ describe('Segmented Request Index Selection', function () { Promise = $injector.get('Promise'); HitSortFn = Private(HitSortFnProv); $rootScope = $injector.get('$rootScope'); - SegmentedReq = Private(SegmentedRequestProvider); + EsSegmentedReq = Private(EsSegmentedRequestProvider); MockSource = class { constructor() { @@ -43,7 +43,7 @@ describe('Segmented Request Index Selection', function () { { index: 'five', min: 0, max: 1 }, ])); - const req = new SegmentedReq(search); + const req = new EsSegmentedReq(search); req._handle.setDirection('desc'); req._handle.setSortFn(new HitSortFn('desc')); req._handle.setSize(500); @@ -94,7 +94,7 @@ describe('Segmented Request Index Selection', function () { { index: 'five', min: 5, max: 50 }, ])); - const req = new SegmentedReq(search); + const req = new EsSegmentedReq(search); req._handle.setDirection('desc'); req._handle.setSortFn(new HitSortFn('desc')); req._handle.setSize(10); diff --git a/src/ui/public/courier/fetch_types/__tests__/segmented_size_picking.js b/src/ui/public/courier/fetch_types/__tests__/segmented_size_picking.js index 3339b21d2e925..45e3982244c88 100644 --- a/src/ui/public/courier/fetch_types/__tests__/segmented_size_picking.js +++ b/src/ui/public/courier/fetch_types/__tests__/segmented_size_picking.js @@ -7,14 +7,14 @@ import HitSortFnProv from 'plugins/kibana/discover/_hit_sort_fn'; import NoDigestPromises from 'testUtils/noDigestPromises'; import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source'; -import SegmentedRequestProvider from '../segmented'; +import EsSegmentedRequestProvider from '../es_segmented_request'; describe('Segmented Request Size Picking', function () { let Promise; + let HitSortFn; let $rootScope; - let SegmentedReq; let MockSource; - let HitSortFn; + let EsSegmentedReq; NoDigestPromises.activateForSuite(); @@ -23,7 +23,7 @@ describe('Segmented Request Size Picking', function () { Promise = $injector.get('Promise'); HitSortFn = Private(HitSortFnProv); $rootScope = $injector.get('$rootScope'); - SegmentedReq = Private(SegmentedRequestProvider); + EsSegmentedReq = Private(EsSegmentedRequestProvider); MockSource = class { constructor() { @@ -34,7 +34,7 @@ describe('Segmented Request Size Picking', function () { context('without a size', function () { it('does not set the request size', async function () { - const req = new SegmentedReq(new MockSource()); + const req = new EsSegmentedReq(new MockSource()); req._handle.setDirection('desc'); req._handle.setSortFn(new HitSortFn('desc')); await req.start(); @@ -45,7 +45,7 @@ describe('Segmented Request Size Picking', function () { context('with a size', function () { it('sets the request size to the entire desired size', async function () { - const req = new SegmentedReq(new MockSource()); + const req = new EsSegmentedReq(new MockSource()); req._handle.setDirection('desc'); req._handle.setSize(555); req._handle.setSortFn(new HitSortFn('desc'));