Skip to content

Commit

Permalink
[ML] AIOps: Chunk groups of field candidates into single queries for …
Browse files Browse the repository at this point in the history
…top items and histograms. (#189155)

## Summary

Follow up to #188137.
Part of #187684.

- Groups chunks of terms aggregations for field candidates when running
the fallback to get top terms instead of significant terms when either
baseline or deviation time range contains no documents.
- Groups chunks of histogram aggregations for the data for the mini
histogram charts. Previously we reused the code for the transform/dfa
data grid mini histograms for this, it's now refactored to an optimized
version for log rate analysis.
- Adds `withSpan` wrappers to group log rate analysis steps for APM
(magenta bars in the "after" screenshot).
- Removes some no longer used code from API version 1.
- Disables support for `boolean` fields, it doesn't work properly with
the `frequent_item_sets` aggregations.
- Fixes the loading step sizes to correct the loading progress bar going
from 0-100%.

Before:

<img width="480" alt="image"
src="https://github.com/user-attachments/assets/dc316166-8f2b-4b0f-84a4-6813f69cd10a">

After:

<img width="500" alt="image"
src="https://github.com/user-attachments/assets/4c532c76-42a0-4321-a261-3b7cf9bbd361">


### Checklist

Delete any items that are not applicable to this PR.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
  • Loading branch information
walterra authored Aug 5, 2024
1 parent a00085e commit 22ac46c
Show file tree
Hide file tree
Showing 29 changed files with 780 additions and 520 deletions.
3 changes: 2 additions & 1 deletion x-pack/packages/ml/agg_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ export { numberValidator } from './src/validate_number';

export type {
FieldsForHistograms,
NumericDataItem,
NumericChartData,
NumericHistogramField,
} from './src/fetch_histograms_for_fields';
export { isMultiBucketAggregate } from './src/is_multi_bucket_aggregate';
export { isSignificantItem } from './src/type_guards';
export { isSignificantItem, isSignificantItemGroup } from './src/type_guards';
export { SIGNIFICANT_ITEM_TYPE } from './src/types';
export type {
AggCardinality,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ interface AggHistogram {
histogram: {
field: string;
interval: number;
min_doc_count?: number;
extended_bounds?: {
min: number;
max: number;
};
};
}

Expand All @@ -45,7 +50,7 @@ interface AggTerms {
* Represents an item in numeric data.
* @interface
*/
interface NumericDataItem {
export interface NumericDataItem {
/**
* The numeric key.
*/
Expand Down
12 changes: 10 additions & 2 deletions x-pack/packages/ml/agg_utils/src/type_guards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

import { isPopulatedObject } from '@kbn/ml-is-populated-object';

import type { SignificantItem } from './types';
import type { SignificantItem, SignificantItemGroup } from './types';

/**
* Type guard for a significant item.
* Note this is used as a custom type within Log Rate Analysis
* for a p-value based variant, not a generic significant terms
* aggregation type.
* @param arg The unknown type to be evaluated
* @returns whether arg is of type SignificantItem
* @returns Return whether arg is of type SignificantItem
*/
export function isSignificantItem(arg: unknown): arg is SignificantItem {
return isPopulatedObject(arg, [
Expand All @@ -32,3 +32,11 @@ export function isSignificantItem(arg: unknown): arg is SignificantItem {
'normalizedScore',
]);
}
/**
* Type guard to check if the given argument is a SignificantItemGroup.
* @param arg The unknown type to be evaluated
* @returns Return whether arg is of type SignificantItemGroup
*/
export function isSignificantItemGroup(arg: unknown): arg is SignificantItemGroup {
return isPopulatedObject(arg, ['id', 'group', 'docCount', 'pValue']);
}
16 changes: 2 additions & 14 deletions x-pack/packages/ml/agg_utils/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,25 +180,13 @@ interface SignificantItemHistogramItemBase {
}

/**
* @deprecated since version 2 of internal log rate analysis REST API endpoint
* Represents a data item in a significant term histogram.
*/
interface SignificantItemHistogramItemV1 extends SignificantItemHistogramItemBase {
/** The document count for this item in the significant term context. */
doc_count_significant_term: number;
}

interface SignificantItemHistogramItemV2 extends SignificantItemHistogramItemBase {
export interface SignificantItemHistogramItem extends SignificantItemHistogramItemBase {
/** The document count for this histogram item in the significant item context. */
doc_count_significant_item: number;
}

/**
* Represents a data item in a significant term histogram.
*/
export type SignificantItemHistogramItem =
| SignificantItemHistogramItemV1
| SignificantItemHistogramItemV2;

/**
* Represents histogram data for a field/value pair.
* @interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,27 @@
import { createRandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';

import { paramsMock } from './__mocks__/params_match_all';
import { getBaselineOrDeviationFilter, getCategoryRequest } from './fetch_categories';
import {
getBaselineOrDeviationFilter,
getCategoryRequest,
isMsearchResponseItemWithAggs,
} from './fetch_categories';

describe('isMsearchResponseItemWithAggs', () => {
it('returns true if the argument is an MsearchMultiSearchItem with aggregations', () => {
const arg = {
aggregations: {},
};

expect(isMsearchResponseItemWithAggs(arg)).toBe(true);
});

it('returns false if the argument is not an MsearchMultiSearchItem with aggregations', () => {
const arg = {};

expect(isMsearchResponseItemWithAggs(arg)).toBe(false);
});
});

describe('getBaselineOrDeviationFilter', () => {
it('returns a filter that matches both baseline and deviation time range', () => {
Expand Down
113 changes: 61 additions & 52 deletions x-pack/packages/ml/aiops_log_rate_analysis/queries/fetch_categories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';

import type { ElasticsearchClient } from '@kbn/core/server';
import type { Logger } from '@kbn/logging';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import {
createRandomSamplerWrapper,
type RandomSamplerWrapper,
Expand All @@ -23,6 +24,10 @@ import type { AiopsLogRateAnalysisSchema } from '../api/schema';

import { getQueryWithParams } from './get_query_with_params';

export const isMsearchResponseItemWithAggs = (
arg: unknown
): arg is estypes.MsearchMultiSearchItem => isPopulatedObject(arg, ['aggregations']);

// Filter that includes docs from both the baseline and deviation time range.
export const getBaselineOrDeviationFilter = (
params: AiopsLogRateAnalysisSchema
Expand Down Expand Up @@ -111,21 +116,21 @@ export const fetchCategories = async (

const result: FetchCategoriesResponse[] = [];

const settledPromises = await Promise.allSettled(
fieldNames.map((fieldName) => {
const request = getCategoryRequest(params, fieldName, randomSamplerWrapper);
return esClient.search(request, {
signal: abortSignal,
maxRetries: 0,
});
})
);
const searches: estypes.MsearchRequestItem[] = fieldNames.flatMap((fieldName) => [
{ index: params.index },
getCategoryRequest(params, fieldName, randomSamplerWrapper)
.body as estypes.MsearchMultisearchBody,
]);

function reportError(fieldName: string, error: unknown) {
let mSearchResponse;

try {
mSearchResponse = await esClient.msearch({ searches }, { signal: abortSignal, maxRetries: 0 });
} catch (error) {
if (!isRequestAbortedError(error)) {
if (logger) {
logger.error(
`Failed to fetch category aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
`Failed to fetch category aggregation for field names ${fieldNames.join()}, got: \n${JSON.stringify(
error,
null,
2
Expand All @@ -134,55 +139,59 @@ export const fetchCategories = async (
}

if (emitError) {
emitError(`Failed to fetch category aggregation for fieldName "${fieldName}".`);
emitError(`Failed to fetch category aggregation for field names "${fieldNames.join()}".`);
}
}
return result;
}

for (const [index, settledPromise] of settledPromises.entries()) {
for (const [index, resp] of mSearchResponse.responses.entries()) {
const fieldName = fieldNames[index];

if (settledPromise.status === 'rejected') {
reportError(fieldName, settledPromise.reason);
// Still continue the analysis even if individual category queries fail.
continue;
}

const resp = settledPromise.value;
const { aggregations } = resp;
if (isMsearchResponseItemWithAggs(resp)) {
const { aggregations } = resp;

const {
categories: { buckets },
} = randomSamplerWrapper.unwrap(
aggregations as unknown as Record<string, estypes.AggregationsAggregate>
) as CategoriesAgg;

const categories: Category[] = buckets.map((b) => {
const sparkline =
b.sparkline === undefined
? {}
: b.sparkline.buckets.reduce<Record<number, number>>((acc2, cur2) => {
acc2[cur2.key] = cur2.doc_count;
return acc2;
}, {});

return {
key: b.key,
count: b.doc_count,
examples: b.examples.hits.hits.map((h) => get(h._source, fieldName)),
sparkline,
regex: b.regex,
};
});
result.push({
categories,
});
} else {
if (logger) {
logger.error(
`Failed to fetch category aggregation for field "${fieldName}", got: \n${JSON.stringify(
resp,
null,
2
)}`
);
}

if (aggregations === undefined) {
reportError(fieldName, resp);
// Still continue the analysis even if individual category queries fail.
continue;
if (emitError) {
emitError(`Failed to fetch category aggregation for field "${fieldName}".`);
}
}

const {
categories: { buckets },
} = randomSamplerWrapper.unwrap(
aggregations as unknown as Record<string, estypes.AggregationsAggregate>
) as CategoriesAgg;

const categories: Category[] = buckets.map((b) => {
const sparkline =
b.sparkline === undefined
? {}
: b.sparkline.buckets.reduce<Record<number, number>>((acc2, cur2) => {
acc2[cur2.key] = cur2.doc_count;
return acc2;
}, {});

return {
key: b.key,
count: b.doc_count,
examples: b.examples.hits.hits.map((h) => get(h._source, fieldName)),
sparkline,
regex: b.regex,
};
});
result.push({
categories,
});
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,7 @@ export const fetchCategoryCounts = async (
let mSearchresponse;

try {
mSearchresponse = await esClient.msearch(
{ searches },
{
signal: abortSignal,
maxRetries: 0,
}
);
mSearchresponse = await esClient.msearch({ searches }, { signal: abortSignal, maxRetries: 0 });
} catch (error) {
if (!isRequestAbortedError(error)) {
if (logger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ describe('fetchFieldCandidates', () => {
'event.type',
'fileset.name',
'host.architecture',
'host.containerized',
'host.hostname',
'host.ip',
'host.mac',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ export const TEXT_FIELD_SAFE_LIST = ['message', 'error.message'];
export const SUPPORTED_ES_FIELD_TYPES = [
ES_FIELD_TYPES.KEYWORD,
ES_FIELD_TYPES.IP,
ES_FIELD_TYPES.BOOLEAN,
// Disabled boolean support because it causes problems with the `frequent_item_sets` aggregation
// ES_FIELD_TYPES.BOOLEAN,
];

export const SUPPORTED_ES_FIELD_TYPES_TEXT = [ES_FIELD_TYPES.TEXT, ES_FIELD_TYPES.MATCH_ONLY_TEXT];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export const fetchIndexInfo = async ({

return {
keywordFieldCandidates: fieldCandidates?.selectedKeywordFieldCandidates.sort() ?? [],
textFieldCandidates: fieldCandidates?.textFieldCandidates.sort() ?? [],
textFieldCandidates: fieldCandidates?.selectedTextFieldCandidates.sort() ?? [],
baselineTotalDocCount,
deviationTotalDocCount,
zeroDocsFallback: baselineTotalDocCount === 0 || deviationTotalDocCount === 0,
Expand Down
Loading

0 comments on commit 22ac46c

Please sign in to comment.