Skip to content

Commit

Permalink
[Partial Results] Update esaggs expressions function to return part…
Browse files Browse the repository at this point in the history
…ial results (#105620)

* Update `esaggs` expressions function to support partial results
* Add partial results throttling in the expressions loader
  • Loading branch information
dokmic authored Jul 19, 2021
1 parent dc45560 commit 15a613f
Show file tree
Hide file tree
Showing 14 changed files with 267 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ export interface IExpressionLoaderParams
| [hasCompatibleActions](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.hascompatibleactions.md) | <code>ExpressionRenderHandlerParams['hasCompatibleActions']</code> | |
| [inspectorAdapters](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.inspectoradapters.md) | <code>Adapters</code> | |
| [onRenderError](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.onrendererror.md) | <code>RenderErrorHandlerFnType</code> | |
| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | <code>boolean</code> | |
| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | <code>boolean</code> | The flag to toggle on emitting partial results. By default, the partial results are disabled. |
| [renderMode](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.rendermode.md) | <code>RenderMode</code> | |
| [searchContext](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchcontext.md) | <code>SerializableState</code> | |
| [searchSessionId](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchsessionid.md) | <code>string</code> | |
| [syncColors](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.synccolors.md) | <code>boolean</code> | |
| [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md) | <code>number</code> | Throttling of partial results in milliseconds. By default, throttling is disabled. |
| [uiState](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.uistate.md) | <code>unknown</code> | |
| [variables](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.variables.md) | <code>Record&lt;string, any&gt;</code> | |

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

## IExpressionLoaderParams.partial property

The flag to toggle on emitting partial results. By default, the partial results are disabled.

<b>Signature:</b>

```typescript
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [kibana-plugin-plugins-expressions-public](./kibana-plugin-plugins-expressions-public.md) &gt; [IExpressionLoaderParams](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md) &gt; [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md)

## IExpressionLoaderParams.throttle property

Throttling of partial results in milliseconds. By default, throttling is disabled.

<b>Signature:</b>

```typescript
throttle?: number;
```
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

import { i18n } from '@kbn/i18n';
import { Observable } from 'rxjs';

import { Datatable, ExpressionFunctionDefinition } from 'src/plugins/expressions/common';

Expand All @@ -22,7 +23,7 @@ import { handleRequest } from './request_handler';
const name = 'esaggs';

type Input = KibanaContext | null;
type Output = Promise<Datatable>;
type Output = Observable<Datatable>;

interface Arguments {
index: IndexPatternExpressionType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Side Public License, v 1.
*/

import { from } from 'rxjs';
import type { MockedKeys } from '@kbn/utility-types/jest';
import type { Filter } from '../../../es_query';
import type { IndexPattern } from '../../../index_patterns';
Expand All @@ -21,6 +22,7 @@ jest.mock('../../tabify', () => ({

import { tabifyAggResponse } from '../../tabify';
import { of } from 'rxjs';
import { toArray } from 'rxjs/operators';

describe('esaggs expression function - public', () => {
let mockParams: MockedKeys<RequestHandlerParams>;
Expand Down Expand Up @@ -57,15 +59,15 @@ describe('esaggs expression function - public', () => {
});

test('should create a new search source instance', async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
expect(mockParams.searchSourceService.create).toHaveBeenCalledTimes(1);
});

describe('sets the expected fields on search source', () => {
let searchSource: MockedKeys<ISearchSource>;

beforeEach(async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
searchSource = await mockParams.searchSourceService.create();
});

Expand Down Expand Up @@ -100,7 +102,7 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
filters: mockFilters,
});
}).toPromise();
searchSource = await mockParams.searchSourceService.create();
expect((searchSource.setField as jest.Mock).mock.calls[3]).toEqual(['filter', mockFilters]);
});
Expand All @@ -118,14 +120,14 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
query: mockQuery,
});
}).toPromise();
searchSource = await mockParams.searchSourceService.create();
expect((searchSource.setField as jest.Mock).mock.calls[4]).toEqual(['query', mockQuery]);
});
});

test('calls searchSource.fetch', async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
const searchSource = await mockParams.searchSourceService.create();

expect(searchSource.fetch$).toHaveBeenCalledWith({
Expand All @@ -140,7 +142,7 @@ describe('esaggs expression function - public', () => {
});

test('tabifies response data', async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
expect(tabifyAggResponse).toHaveBeenCalledWith(
mockParams.aggs,
{},
Expand All @@ -155,7 +157,7 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
timeRange: { from: '2020-12-01', to: '2020-12-31' },
});
}).toPromise();
expect((tabifyAggResponse as jest.Mock).mock.calls[0][2].timeRange).toMatchInlineSnapshot(`
Object {
"from": "2020-12-01T05:00:00.000Z",
Expand All @@ -167,4 +169,29 @@ describe('esaggs expression function - public', () => {
}
`);
});

test('returns partial results', async () => {
const searchSource = await mockParams.searchSourceService.create();

(searchSource.fetch$ as jest.MockedFunction<typeof searchSource.fetch$>).mockReturnValue(
from([
{
rawResponse: {},
},
{
rawResponse: {},
},
]) as ReturnType<typeof searchSource.fetch$>
);

const result = await handleRequest({
...mockParams,
query: { query: 'foo', language: 'bar' },
})
.pipe(toArray())
.toPromise();

expect(result).toHaveLength(2);
expect(tabifyAggResponse).toHaveBeenCalledTimes(2);
});
});
168 changes: 89 additions & 79 deletions src/plugins/data/common/search/expressions/esaggs/request_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/

import { i18n } from '@kbn/i18n';
import { defer } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
import { Adapters } from 'src/plugins/inspector/common';

import { calculateBounds, Filter, IndexPattern, Query, TimeRange } from '../../../../common';
Expand All @@ -32,7 +34,7 @@ export interface RequestHandlerParams {
getNow?: () => Date;
}

export const handleRequest = async ({
export const handleRequest = ({
abortSignal,
aggs,
filters,
Expand All @@ -46,87 +48,95 @@ export const handleRequest = async ({
timeRange,
getNow,
}: RequestHandlerParams) => {
const forceNow = getNow?.();
const searchSource = await searchSourceService.create();

searchSource.setField('index', indexPattern);
searchSource.setField('size', 0);

// Create a new search source that inherits the original search source
// but has the appropriate timeRange applied via a filter.
// This is a temporary solution until we properly pass down all required
// information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
// Using callParentStartHandlers: true we make sure, that the parent searchSource
// onSearchRequestStart will be called properly even though we use an inherited
// search source.
const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
const requestSearchSource = timeFilterSearchSource.createChild({ callParentStartHandlers: true });

// If timeFields have been specified, use the specified ones, otherwise use primary time field of index
// pattern if it's available.
const defaultTimeField = indexPattern?.getTimeField?.();
const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields;

aggs.setTimeRange(timeRange as TimeRange);
aggs.setForceNow(forceNow);
aggs.setTimeFields(allTimeFields);

// For now we need to mirror the history of the passed search source, since
// the request inspector wouldn't work otherwise.
Object.defineProperty(requestSearchSource, 'history', {
get() {
return searchSource.history;
},
set(history) {
return (searchSource.history = history);
},
});

requestSearchSource.setField('aggs', aggs);

requestSearchSource.onRequestStart((paramSearchSource, options) => {
return aggs.onSearchRequestStart(paramSearchSource, options);
});

// If a timeRange has been specified and we had at least one timeField available, create range
// filters for that those time fields
if (timeRange && allTimeFields.length > 0) {
timeFilterSearchSource.setField('filter', () => {
return aggs.getSearchSourceTimeFilter(forceNow);
return defer(async () => {
const forceNow = getNow?.();
const searchSource = await searchSourceService.create();

searchSource.setField('index', indexPattern);
searchSource.setField('size', 0);

// Create a new search source that inherits the original search source
// but has the appropriate timeRange applied via a filter.
// This is a temporary solution until we properly pass down all required
// information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
// Using callParentStartHandlers: true we make sure, that the parent searchSource
// onSearchRequestStart will be called properly even though we use an inherited
// search source.
const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
const requestSearchSource = timeFilterSearchSource.createChild({
callParentStartHandlers: true,
});
}

requestSearchSource.setField('filter', filters);
requestSearchSource.setField('query', query);

const { rawResponse: response } = await requestSearchSource
.fetch$({
abortSignal,
sessionId: searchSessionId,
inspector: {
adapter: inspectorAdapters.requests,
title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
defaultMessage: 'Data',
}),
description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
defaultMessage:
'This request queries Elasticsearch to fetch the data for the visualization.',
}),

// If timeFields have been specified, use the specified ones, otherwise use primary time field of index
// pattern if it's available.
const defaultTimeField = indexPattern?.getTimeField?.();
const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
const allTimeFields = timeFields?.length ? timeFields : defaultTimeFields;

aggs.setTimeRange(timeRange as TimeRange);
aggs.setForceNow(forceNow);
aggs.setTimeFields(allTimeFields);

// For now we need to mirror the history of the passed search source, since
// the request inspector wouldn't work otherwise.
Object.defineProperty(requestSearchSource, 'history', {
get() {
return searchSource.history;
},
set(history) {
return (searchSource.history = history);
},
})
.toPromise();
});

const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
const tabifyParams = {
metricsAtAllLevels: aggs.hierarchical,
partialRows,
timeRange: parsedTimeRange
? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
: undefined,
};
requestSearchSource.setField('aggs', aggs);

const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams);
requestSearchSource.onRequestStart((paramSearchSource, options) => {
return aggs.onSearchRequestStart(paramSearchSource, options);
});

return tabifiedResponse;
// If a timeRange has been specified and we had at least one timeField available, create range
// filters for that those time fields
if (timeRange && allTimeFields.length > 0) {
timeFilterSearchSource.setField('filter', () => {
return aggs.getSearchSourceTimeFilter(forceNow);
});
}

requestSearchSource.setField('filter', filters);
requestSearchSource.setField('query', query);

return { allTimeFields, forceNow, requestSearchSource };
}).pipe(
switchMap(({ allTimeFields, forceNow, requestSearchSource }) =>
requestSearchSource
.fetch$({
abortSignal,
sessionId: searchSessionId,
inspector: {
adapter: inspectorAdapters.requests,
title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
defaultMessage: 'Data',
}),
description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
defaultMessage:
'This request queries Elasticsearch to fetch the data for the visualization.',
}),
},
})
.pipe(
map(({ rawResponse: response }) => {
const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
const tabifyParams = {
metricsAtAllLevels: aggs.hierarchical,
partialRows,
timeRange: parsedTimeRange
? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
: undefined,
};

return tabifyAggResponse(aggs, response, tabifyParams);
})
)
)
);
};
Loading

0 comments on commit 15a613f

Please sign in to comment.