Skip to content

Commit

Permalink
[8.x] [Dataset quality] 🐞 Rely solely on _index instead of data_strea…
Browse files Browse the repository at this point in the history
…m properties (elastic#210329) (elastic#210533)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Dataset quality] 🐞 Rely solely on _index instead of data_stream
properties (elastic#210329)](elastic#210329)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Yngrid
Coello","email":"yngrid.coello@elastic.co"},"sourceCommit":{"committedDate":"2025-02-11T10:48:45Z","message":"[Dataset
quality] 🐞 Rely solely on _index instead of data_stream properties
(elastic#210329)\n\nCloses
https://github.com/elastic/logs-dev/issues/192.\r\n\r\n##
Background\r\n\r\nThis have been an long running issue within dataset
quality page which\r\nbecame more noticeable when introducing failure
store. Before this\r\nchange `Dataset quality details` page was already
solely relying on\r\n`_index` instead of filtering documents using
`data_stream` properties\r\nwhile the main page was filtering out the
documents.\r\n\r\n### Before
\r\n\r\n\r\nhttps://github.com/user-attachments/assets/02d14cb9-81a6-4f61-a199-5d1e55443a20\r\n\r\n###
After\r\n\r\n\r\nhttps://github.com/user-attachments/assets/09a4e523-b927-4147-99d1-6ceff40f1027","sha":"d26f9ffbb60a7ec6327920ddb520320ed83241d3","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","backport:prev-minor","backport:prev-major","v9.1.0"],"title":"[Dataset
quality] 🐞 Rely solely on _index instead of data_stream
properties","number":210329,"url":"https://github.com/elastic/kibana/pull/210329","mergeCommit":{"message":"[Dataset
quality] 🐞 Rely solely on _index instead of data_stream properties
(elastic#210329)\n\nCloses
https://github.com/elastic/logs-dev/issues/192.\r\n\r\n##
Background\r\n\r\nThis have been an long running issue within dataset
quality page which\r\nbecame more noticeable when introducing failure
store. Before this\r\nchange `Dataset quality details` page was already
solely relying on\r\n`_index` instead of filtering documents using
`data_stream` properties\r\nwhile the main page was filtering out the
documents.\r\n\r\n### Before
\r\n\r\n\r\nhttps://github.com/user-attachments/assets/02d14cb9-81a6-4f61-a199-5d1e55443a20\r\n\r\n###
After\r\n\r\n\r\nhttps://github.com/user-attachments/assets/09a4e523-b927-4147-99d1-6ceff40f1027","sha":"d26f9ffbb60a7ec6327920ddb520320ed83241d3"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/210329","number":210329,"mergeCommit":{"message":"[Dataset
quality] 🐞 Rely solely on _index instead of data_stream properties
(elastic#210329)\n\nCloses
https://github.com/elastic/logs-dev/issues/192.\r\n\r\n##
Background\r\n\r\nThis have been an long running issue within dataset
quality page which\r\nbecame more noticeable when introducing failure
store. Before this\r\nchange `Dataset quality details` page was already
solely relying on\r\n`_index` instead of filtering documents using
`data_stream` properties\r\nwhile the main page was filtering out the
documents.\r\n\r\n### Before
\r\n\r\n\r\nhttps://github.com/user-attachments/assets/02d14cb9-81a6-4f61-a199-5d1e55443a20\r\n\r\n###
After\r\n\r\n\r\nhttps://github.com/user-attachments/assets/09a4e523-b927-4147-99d1-6ceff40f1027","sha":"d26f9ffbb60a7ec6327920ddb520320ed83241d3"}}]}]
BACKPORT-->

Co-authored-by: Yngrid Coello <yngrid.coello@elastic.co>
  • Loading branch information
kibanamachine and yngrdyn authored Feb 11, 2025
1 parent 31847a4 commit c5954f0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,82 +7,9 @@

import type { ElasticsearchClient } from '@kbn/core/server';
import { DataStreamDocsStat } from '../../../../common/api_types';
import { FAILURE_STORE_SELECTOR } from '../../../../common/constants';
import { DataStreamType } from '../../../../common/types';
import {
extractIndexNameFromBackingIndex,
streamPartsToIndexPattern,
} from '../../../../common/utils';
import { createDatasetQualityESClient } from '../../../utils';
import { DatasetQualityESClient } from '../../../utils/create_dataset_quality_es_client';
import { rangeQuery } from '../../../utils/queries';

const SIZE_LIMIT = 10000;

async function getPaginatedResults(options: {
datasetQualityESClient: DatasetQualityESClient;
index: string;
start: number;
end: number;
after?: { dataset: string };
prevResults?: Record<string, number>;
}) {
const { datasetQualityESClient, index, start, end, after, prevResults = {} } = options;

const bool = {
filter: [...rangeQuery(start, end)],
};

const response = await datasetQualityESClient.search({
index: `${index}${FAILURE_STORE_SELECTOR}`,
size: 0,
query: {
bool,
},
aggs: {
datasets: {
composite: {
...(after ? { after } : {}),
size: SIZE_LIMIT,
sources: [{ dataset: { terms: { field: '_index' } } }],
},
},
},
});

const currResults = (response.aggregations?.datasets.buckets ?? []).reduce((acc, curr) => {
const datasetName = extractIndexNameFromBackingIndex(curr.key.dataset as string);

return {
...acc,
[datasetName]: (acc[datasetName] ?? 0) + curr.doc_count,
};
}, {} as Record<string, number>);

const results = {
...prevResults,
...currResults,
};

if (
response.aggregations?.datasets.after_key &&
response.aggregations?.datasets.buckets.length === SIZE_LIMIT
) {
return getPaginatedResults({
datasetQualityESClient,
index,
start,
end,
after:
(response.aggregations?.datasets.after_key as {
dataset: string;
}) || after,
prevResults: results,
});
}

return results;
}
import { streamPartsToIndexPattern } from '../../../../common/utils';
import { getAggregatedDatasetPaginatedResults } from '../get_dataset_aggregated_paginated_results';

export async function getFailedDocsPaginated(options: {
esClient: ElasticsearchClient;
Expand All @@ -102,17 +29,10 @@ export async function getFailedDocsPaginated(options: {
})
);

const datasetQualityESClient = createDatasetQualityESClient(esClient);

const datasets = await getPaginatedResults({
datasetQualityESClient,
return await getAggregatedDatasetPaginatedResults({
esClient,
index: datasetNames.join(','),
start,
end,
});

return Object.entries(datasets).map(([dataset, count]) => ({
dataset,
count,
}));
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@

import { QueryDslBoolQuery } from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient } from '@kbn/core/server';
import { extractIndexNameFromBackingIndex } from '../../../common/utils';
import { DataStreamDocsStat } from '../../../common/api_types';
import { createDatasetQualityESClient } from '../../utils';
import { rangeQuery } from '../../utils/queries';

interface Dataset {
type: string;
dataset: string;
namespace: string;
}

const SIZE_LIMIT = 10000;
Expand All @@ -37,11 +36,7 @@ export async function getAggregatedDatasetPaginatedResults(options: {
composite: {
...(afterKey ? { after: afterKey } : {}),
size: SIZE_LIMIT,
sources: [
{ type: { terms: { field: 'data_stream.type' } } },
{ dataset: { terms: { field: 'data_stream.dataset' } } },
{ namespace: { terms: { field: 'data_stream.namespace' } } },
],
sources: [{ dataset: { terms: { field: '_index' } } }],
},
},
});
Expand All @@ -65,7 +60,7 @@ export async function getAggregatedDatasetPaginatedResults(options: {

const currResults =
response.aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${bucket.key.type}-${bucket.key.dataset}-${bucket.key.namespace}`,
dataset: bucket.key.dataset as string,
count: bucket.doc_count,
})) ?? [];

Expand All @@ -82,13 +77,17 @@ export async function getAggregatedDatasetPaginatedResults(options: {
end,
after:
(response.aggregations?.datasets.after_key as {
type: string;
dataset: string;
namespace: string;
}) || after,
prevResults: results,
});
}

return results;
return Object.entries(
results.reduce((acc, curr) => {
const dataset = extractIndexNameFromBackingIndex(curr.dataset);
acc[dataset] = (acc[dataset] ?? 0) + curr.count;
return acc;
}, {} as Record<string, number>)
).map(([dataset, count]) => ({ dataset, count }));
}

0 comments on commit c5954f0

Please sign in to comment.