diff --git a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md
index 69f9d380422b6..1bee155250252 100644
--- a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md
+++ b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md
@@ -22,11 +22,12 @@ export interface IExpressionLoaderParams
| [hasCompatibleActions](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.hascompatibleactions.md) | ExpressionRenderHandlerParams['hasCompatibleActions']
| |
| [inspectorAdapters](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.inspectoradapters.md) | Adapters
| |
| [onRenderError](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.onrendererror.md) | RenderErrorHandlerFnType
| |
-| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | boolean
| |
+| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | boolean
| The flag to toggle on emitting partial results. By default, the partial results are disabled. |
| [renderMode](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.rendermode.md) | RenderMode
| |
| [searchContext](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchcontext.md) | SerializableState
| |
| [searchSessionId](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchsessionid.md) | string
| |
| [syncColors](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.synccolors.md) | boolean
| |
+| [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md) | number
| Throttling of partial results in milliseconds. By default, throttling is disabled. |
| [uiState](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.uistate.md) | unknown
| |
| [variables](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.variables.md) | Record<string, any>
| |
diff --git a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md
index 84c42c3f59f26..8922b2d0f377e 100644
--- a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md
+++ b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md
@@ -4,6 +4,8 @@
## IExpressionLoaderParams.partial property
+The flag to toggle on emitting partial results. By default, the partial results are disabled.
+
Signature:
```typescript
diff --git a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md
new file mode 100644
index 0000000000000..3383bce879776
--- /dev/null
+++ b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md
@@ -0,0 +1,13 @@
+
+
+[Home](./index.md) > [kibana-plugin-plugins-expressions-public](./kibana-plugin-plugins-expressions-public.md) > [IExpressionLoaderParams](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md) > [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md)
+
+## IExpressionLoaderParams.throttle property
+
+Throttling of partial results in milliseconds. By default, throttling is disabled.
+
+Signature:
+
+```typescript
+throttle?: number;
+```
diff --git a/src/plugins/data/common/search/expressions/esaggs/esaggs_fn.ts b/src/plugins/data/common/search/expressions/esaggs/esaggs_fn.ts
index c331ba6b4b9a6..f5cb7e9574718 100644
--- a/src/plugins/data/common/search/expressions/esaggs/esaggs_fn.ts
+++ b/src/plugins/data/common/search/expressions/esaggs/esaggs_fn.ts
@@ -7,6 +7,7 @@
*/
import { i18n } from '@kbn/i18n';
+import { Observable } from 'rxjs';
import { Datatable, ExpressionFunctionDefinition } from 'src/plugins/expressions/common';
@@ -22,7 +23,7 @@ import { handleRequest } from './request_handler';
const name = 'esaggs';
type Input = KibanaContext | null;
-type Output = Promise;
+type Output = Observable;
interface Arguments {
index: IndexPatternExpressionType;
diff --git a/src/plugins/data/common/search/expressions/esaggs/request_handler.test.ts b/src/plugins/data/common/search/expressions/esaggs/request_handler.test.ts
index 4f255cf4c244c..dae3661f00c27 100644
--- a/src/plugins/data/common/search/expressions/esaggs/request_handler.test.ts
+++ b/src/plugins/data/common/search/expressions/esaggs/request_handler.test.ts
@@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
+import { from } from 'rxjs';
import type { MockedKeys } from '@kbn/utility-types/jest';
import type { Filter } from '../../../es_query';
import type { IndexPattern } from '../../../index_patterns';
@@ -21,6 +22,7 @@ jest.mock('../../tabify', () => ({
import { tabifyAggResponse } from '../../tabify';
import { of } from 'rxjs';
+import { toArray } from 'rxjs/operators';
describe('esaggs expression function - public', () => {
let mockParams: MockedKeys;
@@ -57,7 +59,7 @@ describe('esaggs expression function - public', () => {
});
test('should create a new search source instance', async () => {
- await handleRequest(mockParams);
+ await handleRequest(mockParams).toPromise();
expect(mockParams.searchSourceService.create).toHaveBeenCalledTimes(1);
});
@@ -65,7 +67,7 @@ describe('esaggs expression function - public', () => {
let searchSource: MockedKeys;
beforeEach(async () => {
- await handleRequest(mockParams);
+ await handleRequest(mockParams).toPromise();
searchSource = await mockParams.searchSourceService.create();
});
@@ -100,7 +102,7 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
filters: mockFilters,
- });
+ }).toPromise();
searchSource = await mockParams.searchSourceService.create();
expect((searchSource.setField as jest.Mock).mock.calls[3]).toEqual(['filter', mockFilters]);
});
@@ -118,14 +120,14 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
query: mockQuery,
- });
+ }).toPromise();
searchSource = await mockParams.searchSourceService.create();
expect((searchSource.setField as jest.Mock).mock.calls[4]).toEqual(['query', mockQuery]);
});
});
test('calls searchSource.fetch', async () => {
- await handleRequest(mockParams);
+ await handleRequest(mockParams).toPromise();
const searchSource = await mockParams.searchSourceService.create();
expect(searchSource.fetch$).toHaveBeenCalledWith({
@@ -140,7 +142,7 @@ describe('esaggs expression function - public', () => {
});
test('tabifies response data', async () => {
- await handleRequest(mockParams);
+ await handleRequest(mockParams).toPromise();
expect(tabifyAggResponse).toHaveBeenCalledWith(
mockParams.aggs,
{},
@@ -155,7 +157,7 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
timeRange: { from: '2020-12-01', to: '2020-12-31' },
- });
+ }).toPromise();
expect((tabifyAggResponse as jest.Mock).mock.calls[0][2].timeRange).toMatchInlineSnapshot(`
Object {
"from": "2020-12-01T05:00:00.000Z",
@@ -167,4 +169,29 @@ describe('esaggs expression function - public', () => {
}
`);
});
+
+ test('returns partial results', async () => {
+ const searchSource = await mockParams.searchSourceService.create();
+
+ (searchSource.fetch$ as jest.MockedFunction).mockReturnValue(
+ from([
+ {
+ rawResponse: {},
+ },
+ {
+ rawResponse: {},
+ },
+ ]) as ReturnType
+ );
+
+ const result = await handleRequest({
+ ...mockParams,
+ query: { query: 'foo', language: 'bar' },
+ })
+ .pipe(toArray())
+ .toPromise();
+
+ expect(result).toHaveLength(2);
+ expect(tabifyAggResponse).toHaveBeenCalledTimes(2);
+ });
});
diff --git a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts
index bf931966f5bae..f697138b13615 100644
--- a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts
+++ b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts
@@ -7,6 +7,8 @@
*/
import { i18n } from '@kbn/i18n';
+import { defer } from 'rxjs';
+import { map, switchMap } from 'rxjs/operators';
import { Adapters } from 'src/plugins/inspector/common';
import { calculateBounds, Filter, IndexPattern, Query, TimeRange } from '../../../../common';
@@ -32,7 +34,7 @@ export interface RequestHandlerParams {
getNow?: () => Date;
}
-export const handleRequest = async ({
+export const handleRequest = ({
abortSignal,
aggs,
filters,
@@ -46,87 +48,95 @@ export const handleRequest = async ({
timeRange,
getNow,
}: RequestHandlerParams) => {
- const forceNow = getNow?.();
- const searchSource = await searchSourceService.create();
-
- searchSource.setField('index', indexPattern);
- searchSource.setField('size', 0);
-
- // Create a new search source that inherits the original search source
- // but has the appropriate timeRange applied via a filter.
- // This is a temporary solution until we properly pass down all required
- // information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
- // Using callParentStartHandlers: true we make sure, that the parent searchSource
- // onSearchRequestStart will be called properly even though we use an inherited
- // search source.
- const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
- const requestSearchSource = timeFilterSearchSource.createChild({ callParentStartHandlers: true });
-
- // If timeFields have been specified, use the specified ones, otherwise use primary time field of index
- // pattern if it's available.
- const defaultTimeField = indexPattern?.getTimeField?.();
- const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
- const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields;
-
- aggs.setTimeRange(timeRange as TimeRange);
- aggs.setForceNow(forceNow);
- aggs.setTimeFields(allTimeFields);
-
- // For now we need to mirror the history of the passed search source, since
- // the request inspector wouldn't work otherwise.
- Object.defineProperty(requestSearchSource, 'history', {
- get() {
- return searchSource.history;
- },
- set(history) {
- return (searchSource.history = history);
- },
- });
-
- requestSearchSource.setField('aggs', aggs);
-
- requestSearchSource.onRequestStart((paramSearchSource, options) => {
- return aggs.onSearchRequestStart(paramSearchSource, options);
- });
-
- // If a timeRange has been specified and we had at least one timeField available, create range
- // filters for that those time fields
- if (timeRange && allTimeFields.length > 0) {
- timeFilterSearchSource.setField('filter', () => {
- return aggs.getSearchSourceTimeFilter(forceNow);
+ return defer(async () => {
+ const forceNow = getNow?.();
+ const searchSource = await searchSourceService.create();
+
+ searchSource.setField('index', indexPattern);
+ searchSource.setField('size', 0);
+
+ // Create a new search source that inherits the original search source
+ // but has the appropriate timeRange applied via a filter.
+ // This is a temporary solution until we properly pass down all required
+ // information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
+ // Using callParentStartHandlers: true we make sure, that the parent searchSource
+ // onSearchRequestStart will be called properly even though we use an inherited
+ // search source.
+ const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
+ const requestSearchSource = timeFilterSearchSource.createChild({
+ callParentStartHandlers: true,
});
- }
-
- requestSearchSource.setField('filter', filters);
- requestSearchSource.setField('query', query);
-
- const { rawResponse: response } = await requestSearchSource
- .fetch$({
- abortSignal,
- sessionId: searchSessionId,
- inspector: {
- adapter: inspectorAdapters.requests,
- title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
- defaultMessage: 'Data',
- }),
- description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
- defaultMessage:
- 'This request queries Elasticsearch to fetch the data for the visualization.',
- }),
+
+ // If timeFields have been specified, use the specified ones, otherwise use primary time field of index
+ // pattern if it's available.
+ const defaultTimeField = indexPattern?.getTimeField?.();
+ const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
+ const allTimeFields = timeFields?.length ? timeFields : defaultTimeFields;
+
+ aggs.setTimeRange(timeRange as TimeRange);
+ aggs.setForceNow(forceNow);
+ aggs.setTimeFields(allTimeFields);
+
+ // For now we need to mirror the history of the passed search source, since
+ // the request inspector wouldn't work otherwise.
+ Object.defineProperty(requestSearchSource, 'history', {
+ get() {
+ return searchSource.history;
+ },
+ set(history) {
+ return (searchSource.history = history);
},
- })
- .toPromise();
+ });
- const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
- const tabifyParams = {
- metricsAtAllLevels: aggs.hierarchical,
- partialRows,
- timeRange: parsedTimeRange
- ? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
- : undefined,
- };
+ requestSearchSource.setField('aggs', aggs);
- const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams);
+ requestSearchSource.onRequestStart((paramSearchSource, options) => {
+ return aggs.onSearchRequestStart(paramSearchSource, options);
+ });
- return tabifiedResponse;
+ // If a timeRange has been specified and we had at least one timeField available, create range
+ // filters for that those time fields
+ if (timeRange && allTimeFields.length > 0) {
+ timeFilterSearchSource.setField('filter', () => {
+ return aggs.getSearchSourceTimeFilter(forceNow);
+ });
+ }
+
+ requestSearchSource.setField('filter', filters);
+ requestSearchSource.setField('query', query);
+
+ return { allTimeFields, forceNow, requestSearchSource };
+ }).pipe(
+ switchMap(({ allTimeFields, forceNow, requestSearchSource }) =>
+ requestSearchSource
+ .fetch$({
+ abortSignal,
+ sessionId: searchSessionId,
+ inspector: {
+ adapter: inspectorAdapters.requests,
+ title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
+ defaultMessage: 'Data',
+ }),
+ description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
+ defaultMessage:
+ 'This request queries Elasticsearch to fetch the data for the visualization.',
+ }),
+ },
+ })
+ .pipe(
+ map(({ rawResponse: response }) => {
+ const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
+ const tabifyParams = {
+ metricsAtAllLevels: aggs.hierarchical,
+ partialRows,
+ timeRange: parsedTimeRange
+ ? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
+ : undefined,
+ };
+
+ return tabifyAggResponse(aggs, response, tabifyParams);
+ })
+ )
+ )
+ );
};
diff --git a/src/plugins/data/public/search/expressions/esaggs.test.ts b/src/plugins/data/public/search/expressions/esaggs.test.ts
index e75bd7be219de..11dfe67838d34 100644
--- a/src/plugins/data/public/search/expressions/esaggs.test.ts
+++ b/src/plugins/data/public/search/expressions/esaggs.test.ts
@@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
+import { of as mockOf } from 'rxjs';
import type { MockedKeys } from '@kbn/utility-types/jest';
import type { ExecutionContext } from 'src/plugins/expressions/public';
import type { IndexPatternsContract } from '../../../common/index_patterns/index_patterns';
@@ -20,7 +21,7 @@ import { getFunctionDefinition } from './esaggs';
jest.mock('../../../common/search/expressions', () => ({
getEsaggsMeta: jest.fn().mockReturnValue({ name: 'esaggs' }),
- handleEsaggsRequest: jest.fn().mockResolvedValue({}),
+ handleEsaggsRequest: jest.fn(() => mockOf({})),
}));
import { getEsaggsMeta, handleEsaggsRequest } from '../../../common/search/expressions';
@@ -74,13 +75,13 @@ describe('esaggs expression function - public', () => {
});
test('calls indexPatterns.create with the values provided by the subexpression arg', async () => {
- await definition().fn(null, args, mockHandlers);
+ await definition().fn(null, args, mockHandlers).toPromise();
expect(startDependencies.indexPatterns.create).toHaveBeenCalledWith(args.index.value, true);
});
test('calls aggs.createAggConfigs with the values provided by the subexpression arg', async () => {
- await definition().fn(null, args, mockHandlers);
+ await definition().fn(null, args, mockHandlers).toPromise();
expect(startDependencies.aggs.createAggConfigs).toHaveBeenCalledWith(
{},
@@ -96,7 +97,7 @@ describe('esaggs expression function - public', () => {
});
test('calls handleEsaggsRequest with all of the right dependencies', async () => {
- await definition().fn(null, args, mockHandlers);
+ await definition().fn(null, args, mockHandlers).toPromise();
expect(handleEsaggsRequest).toHaveBeenCalledWith({
abortSignal: mockHandlers.abortSignal,
@@ -128,7 +129,7 @@ describe('esaggs expression function - public', () => {
timeRange: { from: 'a', to: 'b' },
} as KibanaContext;
- await definition().fn(input, args, mockHandlers);
+ await definition().fn(input, args, mockHandlers).toPromise();
expect(handleEsaggsRequest).toHaveBeenCalledWith(
expect.objectContaining({
diff --git a/src/plugins/data/public/search/expressions/esaggs.ts b/src/plugins/data/public/search/expressions/esaggs.ts
index 1e3d56c71e423..6d658d44980d1 100644
--- a/src/plugins/data/public/search/expressions/esaggs.ts
+++ b/src/plugins/data/public/search/expressions/esaggs.ts
@@ -7,6 +7,8 @@
*/
import { get } from 'lodash';
+import { defer } from 'rxjs';
+import { switchMap } from 'rxjs/operators';
import { StartServicesAccessor } from 'src/core/public';
import {
EsaggsExpressionFunctionDefinition,
@@ -35,30 +37,36 @@ export function getFunctionDefinition({
}) {
return (): EsaggsExpressionFunctionDefinition => ({
...getEsaggsMeta(),
- async fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId }) {
- const { aggs, indexPatterns, searchSource, getNow } = await getStartDependencies();
+ fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId }) {
+ return defer(async () => {
+ const { aggs, indexPatterns, searchSource, getNow } = await getStartDependencies();
- const indexPattern = await indexPatterns.create(args.index.value, true);
- const aggConfigs = aggs.createAggConfigs(
- indexPattern,
- args.aggs!.map((agg) => agg.value)
- );
- aggConfigs.hierarchical = args.metricsAtAllLevels;
+ const indexPattern = await indexPatterns.create(args.index.value, true);
+ const aggConfigs = aggs.createAggConfigs(
+ indexPattern,
+ args.aggs!.map((agg) => agg.value)
+ );
+ aggConfigs.hierarchical = args.metricsAtAllLevels;
- return await handleEsaggsRequest({
- abortSignal,
- aggs: aggConfigs,
- filters: get(input, 'filters', undefined),
- indexPattern,
- inspectorAdapters,
- partialRows: args.partialRows,
- query: get(input, 'query', undefined) as any,
- searchSessionId: getSearchSessionId(),
- searchSourceService: searchSource,
- timeFields: args.timeFields,
- timeRange: get(input, 'timeRange', undefined),
- getNow,
- });
+ return { aggConfigs, indexPattern, searchSource, getNow };
+ }).pipe(
+ switchMap(({ aggConfigs, indexPattern, searchSource, getNow }) =>
+ handleEsaggsRequest({
+ abortSignal,
+ aggs: aggConfigs,
+ filters: get(input, 'filters', undefined),
+ indexPattern,
+ inspectorAdapters,
+ partialRows: args.partialRows,
+ query: get(input, 'query', undefined) as any,
+ searchSessionId: getSearchSessionId(),
+ searchSourceService: searchSource,
+ timeFields: args.timeFields,
+ timeRange: get(input, 'timeRange', undefined),
+ getNow,
+ })
+ )
+ );
},
});
}
diff --git a/src/plugins/data/server/search/expressions/esaggs.test.ts b/src/plugins/data/server/search/expressions/esaggs.test.ts
index 15287e9d8cf5b..7c1f7626f491b 100644
--- a/src/plugins/data/server/search/expressions/esaggs.test.ts
+++ b/src/plugins/data/server/search/expressions/esaggs.test.ts
@@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
+import { of as mockOf } from 'rxjs';
import type { MockedKeys } from '@kbn/utility-types/jest';
import { KibanaRequest } from 'src/core/server';
import type { ExecutionContext } from 'src/plugins/expressions/server';
@@ -21,7 +22,7 @@ import { getFunctionDefinition } from './esaggs';
jest.mock('../../../common/search/expressions', () => ({
getEsaggsMeta: jest.fn().mockReturnValue({ name: 'esaggs' }),
- handleEsaggsRequest: jest.fn().mockResolvedValue({}),
+ handleEsaggsRequest: jest.fn(() => mockOf({})),
}));
import { getEsaggsMeta, handleEsaggsRequest } from '../../../common/search/expressions';
@@ -76,19 +77,19 @@ describe('esaggs expression function - server', () => {
});
test('calls getStartDependencies with the KibanaRequest', async () => {
- await definition().fn(null, args, mockHandlers);
+ await definition().fn(null, args, mockHandlers).toPromise();
expect(getStartDependencies).toHaveBeenCalledWith({ id: 'hi' });
});
test('calls indexPatterns.create with the values provided by the subexpression arg', async () => {
- await definition().fn(null, args, mockHandlers);
+ await definition().fn(null, args, mockHandlers).toPromise();
expect(startDependencies.indexPatterns.create).toHaveBeenCalledWith(args.index.value, true);
});
test('calls aggs.createAggConfigs with the values provided by the subexpression arg', async () => {
- await definition().fn(null, args, mockHandlers);
+ await definition().fn(null, args, mockHandlers).toPromise();
expect(startDependencies.aggs.createAggConfigs).toHaveBeenCalledWith(
{},
@@ -104,7 +105,7 @@ describe('esaggs expression function - server', () => {
});
test('calls handleEsaggsRequest with all of the right dependencies', async () => {
- await definition().fn(null, args, mockHandlers);
+ await definition().fn(null, args, mockHandlers).toPromise();
expect(handleEsaggsRequest).toHaveBeenCalledWith({
abortSignal: mockHandlers.abortSignal,
@@ -135,7 +136,7 @@ describe('esaggs expression function - server', () => {
timeRange: { from: 'a', to: 'b' },
} as KibanaContext;
- await definition().fn(input, args, mockHandlers);
+ await definition().fn(input, args, mockHandlers).toPromise();
expect(handleEsaggsRequest).toHaveBeenCalledWith(
expect.objectContaining({
diff --git a/src/plugins/data/server/search/expressions/esaggs.ts b/src/plugins/data/server/search/expressions/esaggs.ts
index bb22a491b157e..3a39276c8ed41 100644
--- a/src/plugins/data/server/search/expressions/esaggs.ts
+++ b/src/plugins/data/server/search/expressions/esaggs.ts
@@ -7,6 +7,8 @@
*/
import { get } from 'lodash';
+import { defer } from 'rxjs';
+import { switchMap } from 'rxjs/operators';
import { i18n } from '@kbn/i18n';
import { KibanaRequest, StartServicesAccessor } from 'src/core/server';
import {
@@ -36,45 +38,47 @@ export function getFunctionDefinition({
}): () => EsaggsExpressionFunctionDefinition {
return () => ({
...getEsaggsMeta(),
- async fn(
- input,
- args,
- { inspectorAdapters, abortSignal, getSearchSessionId, getKibanaRequest }
- ) {
- const kibanaRequest = getKibanaRequest ? getKibanaRequest() : null;
- if (!kibanaRequest) {
- throw new Error(
- i18n.translate('data.search.esaggs.error.kibanaRequest', {
- defaultMessage:
- 'A KibanaRequest is required to execute this search on the server. ' +
- 'Please provide a request object to the expression execution params.',
- })
- );
- }
+ fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId, getKibanaRequest }) {
+ return defer(async () => {
+ const kibanaRequest = getKibanaRequest ? getKibanaRequest() : null;
+ if (!kibanaRequest) {
+ throw new Error(
+ i18n.translate('data.search.esaggs.error.kibanaRequest', {
+ defaultMessage:
+ 'A KibanaRequest is required to execute this search on the server. ' +
+ 'Please provide a request object to the expression execution params.',
+ })
+ );
+ }
- const { aggs, indexPatterns, searchSource } = await getStartDependencies(kibanaRequest);
+ const { aggs, indexPatterns, searchSource } = await getStartDependencies(kibanaRequest);
- const indexPattern = await indexPatterns.create(args.index.value, true);
- const aggConfigs = aggs.createAggConfigs(
- indexPattern,
- args.aggs!.map((agg) => agg.value)
- );
+ const indexPattern = await indexPatterns.create(args.index.value, true);
+ const aggConfigs = aggs.createAggConfigs(
+ indexPattern,
+ args.aggs!.map((agg) => agg.value)
+ );
- aggConfigs.hierarchical = args.metricsAtAllLevels;
+ aggConfigs.hierarchical = args.metricsAtAllLevels;
- return await handleEsaggsRequest({
- abortSignal,
- aggs: aggConfigs,
- filters: get(input, 'filters', undefined),
- indexPattern,
- inspectorAdapters,
- partialRows: args.partialRows,
- query: get(input, 'query', undefined) as any,
- searchSessionId: getSearchSessionId(),
- searchSourceService: searchSource,
- timeFields: args.timeFields,
- timeRange: get(input, 'timeRange', undefined),
- });
+ return { aggConfigs, indexPattern, searchSource };
+ }).pipe(
+ switchMap(({ aggConfigs, indexPattern, searchSource }) =>
+ handleEsaggsRequest({
+ abortSignal,
+ aggs: aggConfigs,
+ filters: get(input, 'filters', undefined),
+ indexPattern,
+ inspectorAdapters,
+ partialRows: args.partialRows,
+ query: get(input, 'query', undefined) as any,
+ searchSessionId: getSearchSessionId(),
+ searchSourceService: searchSource,
+ timeFields: args.timeFields,
+ timeRange: get(input, 'timeRange', undefined),
+ })
+ )
+ );
},
});
}
diff --git a/src/plugins/expressions/public/loader.test.ts b/src/plugins/expressions/public/loader.test.ts
index 86477e53dc1a1..4c0ed842076c6 100644
--- a/src/plugins/expressions/public/loader.test.ts
+++ b/src/plugins/expressions/public/loader.test.ts
@@ -8,6 +8,7 @@
import { of } from 'rxjs';
import { first, skip, toArray } from 'rxjs/operators';
+import { TestScheduler } from 'rxjs/testing';
import { loader, ExpressionLoader } from './loader';
import { Observable } from 'rxjs';
import {
@@ -22,6 +23,8 @@ const { __getLastExecution, __getLastRenderMode } = require('./services');
const element: HTMLElement = null as any;
+let testScheduler: TestScheduler;
+
jest.mock('./services', () => {
let renderMode: RenderMode | undefined;
const renderers: Record = {
@@ -88,6 +91,10 @@ describe('execute helper function', () => {
describe('ExpressionLoader', () => {
const expressionString = 'demodata';
+ beforeEach(() => {
+ testScheduler = new TestScheduler((actual, expected) => expect(actual).toStrictEqual(expected));
+ });
+
describe('constructor', () => {
it('accepts expression string', () => {
const expressionLoader = new ExpressionLoader(element, expressionString, {});
@@ -130,6 +137,7 @@ describe('ExpressionLoader', () => {
const expressionLoader = new ExpressionLoader(element, 'var foo', {
variables: { foo: of(1, 2) },
partial: true,
+ throttle: 0,
});
const { result, partial } = await expressionLoader.data$.pipe(first()).toPromise();
@@ -137,6 +145,22 @@ describe('ExpressionLoader', () => {
expect(result).toBe(1);
});
+ it('throttles partial results', async () => {
+ testScheduler.run(({ cold, expectObservable }) => {
+ const expressionLoader = new ExpressionLoader(element, 'var foo', {
+ variables: { foo: cold('a 5ms b 5ms c 10ms d', { a: 1, b: 2, c: 3, d: 4 }) },
+ partial: true,
+ throttle: 20,
+ });
+
+ expectObservable(expressionLoader.data$).toBe('a 19ms c 2ms d', {
+ a: expect.objectContaining({ result: 1 }),
+ c: expect.objectContaining({ result: 3 }),
+ d: expect.objectContaining({ result: 4 }),
+ });
+ });
+ });
+
it('emits on loading$ on initial load and on updates', async () => {
const expressionLoader = new ExpressionLoader(element, expressionString, {});
const loadingPromise = expressionLoader.loading$.pipe(toArray()).toPromise();
diff --git a/src/plugins/expressions/public/loader.ts b/src/plugins/expressions/public/loader.ts
index a51ce35c68180..e5e63b044ad0b 100644
--- a/src/plugins/expressions/public/loader.ts
+++ b/src/plugins/expressions/public/loader.ts
@@ -6,8 +6,8 @@
* Side Public License, v 1.
*/
-import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
-import { filter, map, delay } from 'rxjs/operators';
+import { BehaviorSubject, Observable, Subject, Subscription, asyncScheduler, identity } from 'rxjs';
+import { filter, map, delay, throttleTime } from 'rxjs/operators';
import { defaults } from 'lodash';
import { UnwrapObservable } from '@kbn/utility-types';
import { Adapters } from '../../inspector/public';
@@ -145,7 +145,10 @@ export class ExpressionLoader {
.getData()
.pipe(
delay(0), // delaying until the next tick since we execute the expression in the constructor
- filter(({ partial }) => params.partial || !partial)
+ filter(({ partial }) => params.partial || !partial),
+ params.partial && params.throttle
+ ? throttleTime(params.throttle, asyncScheduler, { leading: true, trailing: true })
+ : identity
)
.subscribe((value) => this.dataSubject.next(value));
};
@@ -178,6 +181,7 @@ export class ExpressionLoader {
this.params.syncColors = params.syncColors;
this.params.debug = Boolean(params.debug);
this.params.partial = Boolean(params.partial);
+ this.params.throttle = Number(params.throttle ?? 1000);
this.params.inspectorAdapters = (params.inspectorAdapters ||
this.execution?.inspect()) as Adapters;
diff --git a/src/plugins/expressions/public/public.api.md b/src/plugins/expressions/public/public.api.md
index 55655cfc5d156..3aa902be5ba63 100644
--- a/src/plugins/expressions/public/public.api.md
+++ b/src/plugins/expressions/public/public.api.md
@@ -908,7 +908,6 @@ export interface IExpressionLoaderParams {
//
// (undocumented)
onRenderError?: RenderErrorHandlerFnType;
- // (undocumented)
partial?: boolean;
// Warning: (ae-forgotten-export) The symbol "RenderMode" needs to be exported by the entry point index.d.ts
//
@@ -920,6 +919,7 @@ export interface IExpressionLoaderParams {
searchSessionId?: string;
// (undocumented)
syncColors?: boolean;
+ throttle?: number;
// (undocumented)
uiState?: unknown;
// (undocumented)
diff --git a/src/plugins/expressions/public/types/index.ts b/src/plugins/expressions/public/types/index.ts
index 2375252e82784..a691aa31a75c1 100644
--- a/src/plugins/expressions/public/types/index.ts
+++ b/src/plugins/expressions/public/types/index.ts
@@ -48,7 +48,18 @@ export interface IExpressionLoaderParams {
renderMode?: RenderMode;
syncColors?: boolean;
hasCompatibleActions?: ExpressionRenderHandlerParams['hasCompatibleActions'];
+
+ /**
+ * The flag to toggle on emitting partial results.
+ * By default, the partial results are disabled.
+ */
partial?: boolean;
+
+ /**
+ * Throttling of partial results in milliseconds. 0 is disabling the throttling.
+ * By default, it equals 1000.
+ */
+ throttle?: number;
}
export interface ExpressionRenderError extends Error {