diff --git a/x-pack/platform/plugins/shared/dataset_quality/server/routes/data_streams/failed_docs/get_failed_docs.ts b/x-pack/platform/plugins/shared/dataset_quality/server/routes/data_streams/failed_docs/get_failed_docs.ts index 1facc8e7be540..caff754e59e21 100644 --- a/x-pack/platform/plugins/shared/dataset_quality/server/routes/data_streams/failed_docs/get_failed_docs.ts +++ b/x-pack/platform/plugins/shared/dataset_quality/server/routes/data_streams/failed_docs/get_failed_docs.ts @@ -7,82 +7,9 @@ import type { ElasticsearchClient } from '@kbn/core/server'; import { DataStreamDocsStat } from '../../../../common/api_types'; -import { FAILURE_STORE_SELECTOR } from '../../../../common/constants'; import { DataStreamType } from '../../../../common/types'; -import { - extractIndexNameFromBackingIndex, - streamPartsToIndexPattern, -} from '../../../../common/utils'; -import { createDatasetQualityESClient } from '../../../utils'; -import { DatasetQualityESClient } from '../../../utils/create_dataset_quality_es_client'; -import { rangeQuery } from '../../../utils/queries'; - -const SIZE_LIMIT = 10000; - -async function getPaginatedResults(options: { - datasetQualityESClient: DatasetQualityESClient; - index: string; - start: number; - end: number; - after?: { dataset: string }; - prevResults?: Record; -}) { - const { datasetQualityESClient, index, start, end, after, prevResults = {} } = options; - - const bool = { - filter: [...rangeQuery(start, end)], - }; - - const response = await datasetQualityESClient.search({ - index: `${index}${FAILURE_STORE_SELECTOR}`, - size: 0, - query: { - bool, - }, - aggs: { - datasets: { - composite: { - ...(after ? { after } : {}), - size: SIZE_LIMIT, - sources: [{ dataset: { terms: { field: '_index' } } }], - }, - }, - }, - }); - - const currResults = (response.aggregations?.datasets.buckets ?? []).reduce((acc, curr) => { - const datasetName = extractIndexNameFromBackingIndex(curr.key.dataset as string); - - return { - ...acc, - [datasetName]: (acc[datasetName] ?? 0) + curr.doc_count, - }; - }, {} as Record); - - const results = { - ...prevResults, - ...currResults, - }; - - if ( - response.aggregations?.datasets.after_key && - response.aggregations?.datasets.buckets.length === SIZE_LIMIT - ) { - return getPaginatedResults({ - datasetQualityESClient, - index, - start, - end, - after: - (response.aggregations?.datasets.after_key as { - dataset: string; - }) || after, - prevResults: results, - }); - } - - return results; -} +import { streamPartsToIndexPattern } from '../../../../common/utils'; +import { getAggregatedDatasetPaginatedResults } from '../get_dataset_aggregated_paginated_results'; export async function getFailedDocsPaginated(options: { esClient: ElasticsearchClient; @@ -102,17 +29,10 @@ export async function getFailedDocsPaginated(options: { }) ); - const datasetQualityESClient = createDatasetQualityESClient(esClient); - - const datasets = await getPaginatedResults({ - datasetQualityESClient, + return await getAggregatedDatasetPaginatedResults({ + esClient, index: datasetNames.join(','), start, end, }); - - return Object.entries(datasets).map(([dataset, count]) => ({ - dataset, - count, - })); } diff --git a/x-pack/platform/plugins/shared/dataset_quality/server/routes/data_streams/get_dataset_aggregated_paginated_results.ts b/x-pack/platform/plugins/shared/dataset_quality/server/routes/data_streams/get_dataset_aggregated_paginated_results.ts index fe9af4dda94a1..2935e2d24592a 100644 --- a/x-pack/platform/plugins/shared/dataset_quality/server/routes/data_streams/get_dataset_aggregated_paginated_results.ts +++ b/x-pack/platform/plugins/shared/dataset_quality/server/routes/data_streams/get_dataset_aggregated_paginated_results.ts @@ -7,14 +7,13 @@ import { QueryDslBoolQuery } from '@elastic/elasticsearch/lib/api/types'; import type { ElasticsearchClient } from '@kbn/core/server'; +import { extractIndexNameFromBackingIndex } from '../../../common/utils'; import { DataStreamDocsStat } from '../../../common/api_types'; import { createDatasetQualityESClient } from '../../utils'; import { rangeQuery } from '../../utils/queries'; interface Dataset { - type: string; dataset: string; - namespace: string; } const SIZE_LIMIT = 10000; @@ -37,11 +36,7 @@ export async function getAggregatedDatasetPaginatedResults(options: { composite: { ...(afterKey ? { after: afterKey } : {}), size: SIZE_LIMIT, - sources: [ - { type: { terms: { field: 'data_stream.type' } } }, - { dataset: { terms: { field: 'data_stream.dataset' } } }, - { namespace: { terms: { field: 'data_stream.namespace' } } }, - ], + sources: [{ dataset: { terms: { field: '_index' } } }], }, }, }); @@ -65,7 +60,7 @@ export async function getAggregatedDatasetPaginatedResults(options: { const currResults = response.aggregations?.datasets.buckets.map((bucket) => ({ - dataset: `${bucket.key.type}-${bucket.key.dataset}-${bucket.key.namespace}`, + dataset: bucket.key.dataset as string, count: bucket.doc_count, })) ?? []; @@ -82,13 +77,17 @@ export async function getAggregatedDatasetPaginatedResults(options: { end, after: (response.aggregations?.datasets.after_key as { - type: string; dataset: string; - namespace: string; }) || after, prevResults: results, }); } - return results; + return Object.entries( + results.reduce((acc, curr) => { + const dataset = extractIndexNameFromBackingIndex(curr.dataset); + acc[dataset] = (acc[dataset] ?? 0) + curr.count; + return acc; + }, {} as Record) + ).map(([dataset, count]) => ({ dataset, count })); }