Skip to content

Commit

Permalink
[ML] Fix time range query in the Anomaly detection alert execution (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
darnautov authored Mar 8, 2021
1 parent d5e4a2a commit 6b26d19
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 84 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugins/ml/common/types/alerts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export type TopHitsResultsKeys = 'top_record_hits' | 'top_bucket_hits' | 'top_in

export interface AlertExecutionResult {
count: number;
key: number;
key?: number;
alertInstanceKey: string;
isInterim: boolean;
jobIds: string[];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ const MlAnomalyAlertTrigger: FC<MlAnomalyAlertTriggerProps> = ({
// Set defaults
severity: ANOMALY_THRESHOLD.CRITICAL,
resultType: ANOMALY_RESULT_TYPE.BUCKET,
includeInterim: true,
includeInterim: false,
// Preserve job selection
jobSelection,
});
Expand Down
159 changes: 90 additions & 69 deletions x-pack/plugins/ml/server/lib/alerts/alerting_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ import {
import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type';
import { MlJobsResponse } from '../../../common/types/job_service';
import { resolveBucketSpanInSeconds } from '../../../common/util/job_utils';
import { isDefined } from '../../../common/types/guards';

type AggResultsResponse = { key?: number } & {
[key in PreviewResultsKeys]: {
doc_count: number;
} & {
[hitsKey in TopHitsResultsKeys]: {
hits: { hits: any[] };
};
};
};

/**
* Alerting related server-side methods
Expand Down Expand Up @@ -253,6 +264,51 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
return source.job_id;
};

const getResultsFormatter = (resultType: AnomalyResultType) => {
const resultsLabel = getAggResultsLabel(resultType);
return (v: AggResultsResponse): AlertExecutionResult | undefined => {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
if (aggTypeResults.doc_count === 0) {
return;
}

const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;

const topAnomaly = requestedAnomalies[0];
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);

return {
count: aggTypeResults.doc_count,
key: v.key,
alertInstanceKey,
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
timestamp: topAnomaly._source.timestamp,
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
score: topAnomaly.fields.score[0],
bucketRange: {
start: topAnomaly.fields.start[0],
end: topAnomaly.fields.end[0],
},
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => {
return {
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
};
}) as RecordAnomalyAlertDoc[],
topInfluencers: v.influencer_results.top_influencer_hits.hits.hits.map((h) => {
return {
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
};
}) as InfluencerAnomalyAlertDoc[],
};
};
};

/**
* Builds a request body
* @param params - Alert params
Expand Down Expand Up @@ -325,17 +381,19 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
],
},
},
aggs: {
alerts_over_time: {
date_histogram: {
field: 'timestamp',
fixed_interval: lookBackTimeInterval,
// Ignore empty buckets
min_doc_count: 1,
},
aggs: getResultTypeAggRequest(params.resultType as AnomalyResultType, params.severity),
},
},
aggs: previewTimeInterval
? {
alerts_over_time: {
date_histogram: {
field: 'timestamp',
fixed_interval: lookBackTimeInterval,
// Ignore empty buckets
min_doc_count: 1,
},
aggs: getResultTypeAggRequest(params.resultType, params.severity),
},
}
: getResultTypeAggRequest(params.resultType, params.severity),
};

const response = await mlClient.anomalySearch(
Expand All @@ -345,67 +403,30 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
jobIds
);

const result = response.body.aggregations as {
alerts_over_time: {
buckets: Array<
{
doc_count: number;
key: number;
key_as_string: string;
} & {
[key in PreviewResultsKeys]: {
doc_count: number;
} & {
[hitsKey in TopHitsResultsKeys]: {
hits: { hits: any[] };
};
};
}
>;
};
};

const resultsLabel = getAggResultsLabel(params.resultType as AnomalyResultType);
const result = response.body.aggregations;

return (
result.alerts_over_time.buckets
// Filter out empty buckets
.filter((v) => v.doc_count > 0 && v[resultsLabel.aggGroupLabel].doc_count > 0)
// Map response
.map((v) => {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;
const resultsLabel = getAggResultsLabel(params.resultType);

const topAnomaly = requestedAnomalies[0];
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);
const formatter = getResultsFormatter(params.resultType);

return {
count: aggTypeResults.doc_count,
key: v.key,
alertInstanceKey,
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
timestamp: topAnomaly._source.timestamp,
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
score: topAnomaly.fields.score[0],
bucketRange: {
start: topAnomaly.fields.start[0],
end: topAnomaly.fields.end[0],
},
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => ({
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
})) as RecordAnomalyAlertDoc[],
topInfluencers: v.influencer_results.top_influencer_hits.hits.hits.map((h) => ({
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
})) as InfluencerAnomalyAlertDoc[],
return (previewTimeInterval
? (result as {
alerts_over_time: {
buckets: Array<
{
doc_count: number;
key: number;
key_as_string: string;
} & AggResultsResponse
>;
};
})
);
}).alerts_over_time.buckets
// Filter out empty buckets
.filter((v) => v.doc_count > 0 && v[resultsLabel.aggGroupLabel].doc_count > 0)
// Map response
.map(formatter)
: [formatter(result as AggResultsResponse)]
).filter(isDefined);
};

/**
Expand Down Expand Up @@ -510,7 +531,7 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
const result = res[0];
if (!result) return;

const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType as AnomalyResultType);
const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType);

const executionResult = {
...result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ export function registerAnomalyDetectionAlertType({
{
name: 'timestamp',
description: i18n.translate('xpack.ml.alertContext.timestampDescription', {
defaultMessage: 'Timestamp of the anomaly',
defaultMessage: 'The bucket timestamp of the anomaly',
}),
},
{
name: 'timestampIso8601',
description: i18n.translate('xpack.ml.alertContext.timestampIso8601Description', {
defaultMessage: 'Time in ISO8601 format',
defaultMessage: 'The bucket time of the anomaly in ISO8601 format',
}),
},
{
name: 'jobIds',
description: i18n.translate('xpack.ml.alertContext.jobIdsDescription', {
defaultMessage: 'List of job IDs triggered the alert instance',
defaultMessage: 'List of job IDs that triggered the alert instance',
}),
},
{
Expand All @@ -87,7 +87,7 @@ export function registerAnomalyDetectionAlertType({
{
name: 'score',
description: i18n.translate('xpack.ml.alertContext.scoreDescription', {
defaultMessage: 'Anomaly score',
defaultMessage: 'Anomaly score at the time of the notification action',
}),
},
{
Expand All @@ -109,14 +109,6 @@ export function registerAnomalyDetectionAlertType({
}),
useWithTripleBracesInTemplates: true,
},
// TODO remove when https://github.com/elastic/kibana/pull/90525 is merged
{
name: 'kibanaBaseUrl',
description: i18n.translate('xpack.ml.alertContext.kibanaBasePathUrlDescription', {
defaultMessage: 'Kibana base path',
}),
useWithTripleBracesInTemplates: true,
},
],
},
producer: PLUGIN_ID,
Expand Down
7 changes: 6 additions & 1 deletion x-pack/plugins/ml/server/routes/schemas/alerting_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import { schema, TypeOf } from '@kbn/config-schema';
import { i18n } from '@kbn/i18n';
import { ALERT_PREVIEW_SAMPLE_SIZE } from '../../../common/constants/alerts';
import { ANOMALY_RESULT_TYPE } from '../../../common/constants/anomalies';

export const mlAnomalyDetectionAlertParams = schema.object({
jobSelection: schema.object(
Expand All @@ -26,7 +27,11 @@ export const mlAnomalyDetectionAlertParams = schema.object({
}
),
severity: schema.number(),
resultType: schema.string(),
resultType: schema.oneOf([
schema.literal(ANOMALY_RESULT_TYPE.RECORD),
schema.literal(ANOMALY_RESULT_TYPE.BUCKET),
schema.literal(ANOMALY_RESULT_TYPE.INFLUENCER),
]),
includeInterim: schema.boolean({ defaultValue: true }),
});

Expand Down

0 comments on commit 6b26d19

Please sign in to comment.