Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Partial Results] Update esaggs expressions function to return partial results #105620

Merged
merged 3 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ export interface IExpressionLoaderParams
| [hasCompatibleActions](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.hascompatibleactions.md) | <code>ExpressionRenderHandlerParams['hasCompatibleActions']</code> | |
| [inspectorAdapters](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.inspectoradapters.md) | <code>Adapters</code> | |
| [onRenderError](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.onrendererror.md) | <code>RenderErrorHandlerFnType</code> | |
| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | <code>boolean</code> | |
| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | <code>boolean</code> | The flag to toggle on emitting partial results. By default, the partial results are disabled. |
| [renderMode](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.rendermode.md) | <code>RenderMode</code> | |
| [searchContext](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchcontext.md) | <code>SerializableState</code> | |
| [searchSessionId](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchsessionid.md) | <code>string</code> | |
| [syncColors](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.synccolors.md) | <code>boolean</code> | |
| [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md) | <code>number</code> | Throttling of partial results in milliseconds. By default, throttling is disabled. |
| [uiState](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.uistate.md) | <code>unknown</code> | |
| [variables](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.variables.md) | <code>Record&lt;string, any&gt;</code> | |

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

## IExpressionLoaderParams.partial property

The flag to toggle on emitting partial results. By default, the partial results are disabled.

<b>Signature:</b>

```typescript
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [kibana-plugin-plugins-expressions-public](./kibana-plugin-plugins-expressions-public.md) &gt; [IExpressionLoaderParams](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md) &gt; [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md)

## IExpressionLoaderParams.throttle property

Throttling of partial results in milliseconds. By default, throttling is disabled.

<b>Signature:</b>

```typescript
throttle?: number;
```
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

import { i18n } from '@kbn/i18n';
import { Observable } from 'rxjs';

import { Datatable, ExpressionFunctionDefinition } from 'src/plugins/expressions/common';

Expand All @@ -22,7 +23,7 @@ import { handleRequest } from './request_handler';
const name = 'esaggs';

type Input = KibanaContext | null;
type Output = Promise<Datatable>;
type Output = Observable<Datatable>;

interface Arguments {
index: IndexPatternExpressionType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Side Public License, v 1.
*/

import { from } from 'rxjs';
import type { MockedKeys } from '@kbn/utility-types/jest';
import type { Filter } from '../../../es_query';
import type { IndexPattern } from '../../../index_patterns';
Expand All @@ -21,6 +22,7 @@ jest.mock('../../tabify', () => ({

import { tabifyAggResponse } from '../../tabify';
import { of } from 'rxjs';
import { toArray } from 'rxjs/operators';

describe('esaggs expression function - public', () => {
let mockParams: MockedKeys<RequestHandlerParams>;
Expand Down Expand Up @@ -57,15 +59,15 @@ describe('esaggs expression function - public', () => {
});

test('should create a new search source instance', async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
expect(mockParams.searchSourceService.create).toHaveBeenCalledTimes(1);
});

describe('sets the expected fields on search source', () => {
let searchSource: MockedKeys<ISearchSource>;

beforeEach(async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
searchSource = await mockParams.searchSourceService.create();
});

Expand Down Expand Up @@ -100,7 +102,7 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
filters: mockFilters,
});
}).toPromise();
searchSource = await mockParams.searchSourceService.create();
expect((searchSource.setField as jest.Mock).mock.calls[3]).toEqual(['filter', mockFilters]);
});
Expand All @@ -118,14 +120,14 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
query: mockQuery,
});
}).toPromise();
searchSource = await mockParams.searchSourceService.create();
expect((searchSource.setField as jest.Mock).mock.calls[4]).toEqual(['query', mockQuery]);
});
});

test('calls searchSource.fetch', async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
const searchSource = await mockParams.searchSourceService.create();

expect(searchSource.fetch$).toHaveBeenCalledWith({
Expand All @@ -140,7 +142,7 @@ describe('esaggs expression function - public', () => {
});

test('tabifies response data', async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
expect(tabifyAggResponse).toHaveBeenCalledWith(
mockParams.aggs,
{},
Expand All @@ -155,7 +157,7 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
timeRange: { from: '2020-12-01', to: '2020-12-31' },
});
}).toPromise();
expect((tabifyAggResponse as jest.Mock).mock.calls[0][2].timeRange).toMatchInlineSnapshot(`
Object {
"from": "2020-12-01T05:00:00.000Z",
Expand All @@ -167,4 +169,29 @@ describe('esaggs expression function - public', () => {
}
`);
});

test('returns partial results', async () => {
const searchSource = await mockParams.searchSourceService.create();

(searchSource.fetch$ as jest.MockedFunction<typeof searchSource.fetch$>).mockReturnValue(
from([
{
rawResponse: {},
},
{
rawResponse: {},
},
]) as ReturnType<typeof searchSource.fetch$>
);

const result = await handleRequest({
...mockParams,
query: { query: 'foo', language: 'bar' },
})
.pipe(toArray())
.toPromise();

expect(result).toHaveLength(2);
expect(tabifyAggResponse).toHaveBeenCalledTimes(2);
});
});
168 changes: 89 additions & 79 deletions src/plugins/data/common/search/expressions/esaggs/request_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/

import { i18n } from '@kbn/i18n';
import { defer } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
import { Adapters } from 'src/plugins/inspector/common';

import { calculateBounds, Filter, IndexPattern, Query, TimeRange } from '../../../../common';
Expand All @@ -32,7 +34,7 @@ export interface RequestHandlerParams {
getNow?: () => Date;
}

export const handleRequest = async ({
export const handleRequest = ({
abortSignal,
aggs,
filters,
Expand All @@ -46,87 +48,95 @@ export const handleRequest = async ({
timeRange,
getNow,
}: RequestHandlerParams) => {
const forceNow = getNow?.();
const searchSource = await searchSourceService.create();

searchSource.setField('index', indexPattern);
searchSource.setField('size', 0);

// Create a new search source that inherits the original search source
// but has the appropriate timeRange applied via a filter.
// This is a temporary solution until we properly pass down all required
// information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
// Using callParentStartHandlers: true we make sure, that the parent searchSource
// onSearchRequestStart will be called properly even though we use an inherited
// search source.
const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
const requestSearchSource = timeFilterSearchSource.createChild({ callParentStartHandlers: true });

// If timeFields have been specified, use the specified ones, otherwise use primary time field of index
// pattern if it's available.
const defaultTimeField = indexPattern?.getTimeField?.();
const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields;

aggs.setTimeRange(timeRange as TimeRange);
aggs.setForceNow(forceNow);
aggs.setTimeFields(allTimeFields);

// For now we need to mirror the history of the passed search source, since
// the request inspector wouldn't work otherwise.
Object.defineProperty(requestSearchSource, 'history', {
get() {
return searchSource.history;
},
set(history) {
return (searchSource.history = history);
},
});

requestSearchSource.setField('aggs', aggs);

requestSearchSource.onRequestStart((paramSearchSource, options) => {
return aggs.onSearchRequestStart(paramSearchSource, options);
});

// If a timeRange has been specified and we had at least one timeField available, create range
// filters for that those time fields
if (timeRange && allTimeFields.length > 0) {
timeFilterSearchSource.setField('filter', () => {
return aggs.getSearchSourceTimeFilter(forceNow);
return defer(async () => {
const forceNow = getNow?.();
const searchSource = await searchSourceService.create();

searchSource.setField('index', indexPattern);
searchSource.setField('size', 0);

// Create a new search source that inherits the original search source
// but has the appropriate timeRange applied via a filter.
// This is a temporary solution until we properly pass down all required
// information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
// Using callParentStartHandlers: true we make sure, that the parent searchSource
// onSearchRequestStart will be called properly even though we use an inherited
// search source.
const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
const requestSearchSource = timeFilterSearchSource.createChild({
callParentStartHandlers: true,
});
}

requestSearchSource.setField('filter', filters);
requestSearchSource.setField('query', query);

const { rawResponse: response } = await requestSearchSource
.fetch$({
abortSignal,
sessionId: searchSessionId,
inspector: {
adapter: inspectorAdapters.requests,
title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
defaultMessage: 'Data',
}),
description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
defaultMessage:
'This request queries Elasticsearch to fetch the data for the visualization.',
}),

// If timeFields have been specified, use the specified ones, otherwise use primary time field of index
// pattern if it's available.
const defaultTimeField = indexPattern?.getTimeField?.();
const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
const allTimeFields = timeFields?.length ? timeFields : defaultTimeFields;

aggs.setTimeRange(timeRange as TimeRange);
aggs.setForceNow(forceNow);
aggs.setTimeFields(allTimeFields);

// For now we need to mirror the history of the passed search source, since
// the request inspector wouldn't work otherwise.
Object.defineProperty(requestSearchSource, 'history', {
get() {
return searchSource.history;
},
set(history) {
return (searchSource.history = history);
},
})
.toPromise();
});

const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
const tabifyParams = {
metricsAtAllLevels: aggs.hierarchical,
partialRows,
timeRange: parsedTimeRange
? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
: undefined,
};
requestSearchSource.setField('aggs', aggs);

const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams);
requestSearchSource.onRequestStart((paramSearchSource, options) => {
return aggs.onSearchRequestStart(paramSearchSource, options);
});

return tabifiedResponse;
// If a timeRange has been specified and we had at least one timeField available, create range
// filters for that those time fields
if (timeRange && allTimeFields.length > 0) {
timeFilterSearchSource.setField('filter', () => {
return aggs.getSearchSourceTimeFilter(forceNow);
});
}

requestSearchSource.setField('filter', filters);
requestSearchSource.setField('query', query);

return { allTimeFields, forceNow, requestSearchSource };
}).pipe(
switchMap(({ allTimeFields, forceNow, requestSearchSource }) =>
requestSearchSource
.fetch$({
abortSignal,
sessionId: searchSessionId,
inspector: {
adapter: inspectorAdapters.requests,
title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
defaultMessage: 'Data',
}),
description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
defaultMessage:
'This request queries Elasticsearch to fetch the data for the visualization.',
}),
},
})
.pipe(
map(({ rawResponse: response }) => {
const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
const tabifyParams = {
metricsAtAllLevels: aggs.hierarchical,
partialRows,
timeRange: parsedTimeRange
? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
: undefined,
};

return tabifyAggResponse(aggs, response, tabifyParams);
})
)
)
);
};
Loading