Skip to content

Commit

Permalink
[ML] Refactor core logic to Results model
Browse files Browse the repository at this point in the history
  • Loading branch information
qn895 committed Aug 10, 2020
1 parent a35ec15 commit abe77e7
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 110 deletions.
109 changes: 107 additions & 2 deletions x-pack/plugins/ml/server/models/results_service/results_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import { buildAnomalyTableItems } from './build_anomaly_table_items';
import { ML_RESULTS_INDEX_PATTERN } from '../../../common/constants/index_patterns';
import { ANOMALIES_TABLE_DEFAULT_QUERY_SIZE } from '../../../common/constants/search';
import { getPartitionFieldsValuesFactory } from './get_partition_fields_values';
import { AnomaliesTableRecord, AnomalyRecordDoc } from '../../../common/types/anomalies';
import {
AnomaliesTableRecord,
AnomalyCategorizerStatsDoc,
AnomalyRecordDoc,
} from '../../../common/types/anomalies';

// Service for carrying out Elasticsearch queries to obtain data for the
// ML Results dashboards.
Expand All @@ -31,7 +35,7 @@ interface Influencer {
}

export function resultsServiceProvider(mlClusterClient: ILegacyScopedClusterClient) {
const { callAsInternalUser } = mlClusterClient;
const { callAsInternalUser, callAsCurrentUser } = mlClusterClient;
// Obtains data for the anomalies table, aggregating anomalies by day or hour as requested.
// Return an Object with properties 'anomalies' and 'interval' (interval used to aggregate anomalies,
// one of day, hour or second. Note 'auto' can be provided as the aggregationInterval in the request,
Expand Down Expand Up @@ -430,12 +434,113 @@ export function resultsServiceProvider(mlClusterClient: ILegacyScopedClusterClie
return definition;
}

async function getCategorizerStats(jobId: string, partitionByValue?: string) {
const mustMatchClauses: Array<Record<'match', Record<string, string>>> = [
{
match: {
result_type: 'categorizer_stats',
},
},
];

if (typeof partitionByValue === 'string') {
mustMatchClauses.push({
match: {
partition_by_value: partitionByValue,
},
});
}
const results: SearchResponse<AnomalyCategorizerStatsDoc> = await callAsCurrentUser('search', {
index: ML_RESULTS_INDEX_PATTERN,
body: {
query: {
bool: {
must: mustMatchClauses,
filter: [
{
term: {
job_id: jobId,
},
},
],
},
},
},
});
return results ? results.hits.hits.map((r) => r._source) : [];
}

async function getStoppedPartitions(jobId) {
let finalResult: Array<{ key: string; doc_count: number }> = [];
// first determine from job config if stop_on_warn is true
// if false return []
const jobConfigResponse = await callAsInternalUser('ml.jobs', {
jobId,
});

if (!jobConfigResponse || jobConfigResponse.jobs.length !== 1) {
throw Error(`Unable to find anomaly detector job with ID ${jobId}`);
}

const jobConfig = jobConfigResponse.jobs[0];
if (jobConfig.analysis_config?.per_partition_categorization?.stop_on_warn === true) {
// search for categorizer_stats documents for the current job where the categorization_status is warn
// Return all the partition_field_value values from the documents found
const mustMatchClauses: Array<Record<'match', Record<string, string>>> = [
{
match: {
result_type: 'categorizer_stats',
},
},
{
match: {
categorization_status: 'warn',
},
},
];
const results: SearchResponse<any> = await callAsCurrentUser('search', {
index: ML_RESULTS_INDEX_PATTERN,
size: 0,
body: {
query: {
bool: {
must: mustMatchClauses,
filter: [
{
term: {
job_id: jobId,
},
},
],
},
},
aggs: {
unique_partition_field_values: {
terms: {
field: 'partition_field_value',
},
},
},
},
});
if (Array.isArray(results.aggregations?.unique_partition_field_values?.buckets)) {
finalResult = results.aggregations?.unique_partition_field_values?.buckets.map(
(b: { key: string; doc_count: number }) => b.key
);
}
}

return finalResult;
}

return {
getAnomaliesTableData,
getCategoryDefinition,
getCategoryExamples,
getLatestBucketTimestampByJob,
getMaxAnomalyScore,
getPartitionFieldsValues: getPartitionFieldsValuesFactory(mlClusterClient),
getCategorizerStats,
getStoppedPartitions,
};
}
125 changes: 17 additions & 108 deletions x-pack/plugins/ml/server/routes/results_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import { RequestHandlerContext } from 'kibana/server';
import { schema } from '@kbn/config-schema';
import { SearchResponse } from 'elasticsearch';
import { wrapError } from '../client/error_wrapper';
import { RouteInitialization } from '../types';
import {
Expand All @@ -21,8 +20,6 @@ import { ML_RESULTS_INDEX_PATTERN } from '../../common/constants/index_patterns'
import { jobIdSchema } from './schemas/anomaly_detectors_schema';
import { getCategorizerStatsSchema } from './schemas/results_service_schema';

import { AnomalyCategorizerStatsDoc } from '../../common/types/anomalies';

function getAnomaliesTableData(context: RequestHandlerContext, payload: any) {
const rs = resultsServiceProvider(context.ml!.mlClient);
const {
Expand Down Expand Up @@ -76,6 +73,19 @@ function getPartitionFieldsValues(context: RequestHandlerContext, payload: any)
return rs.getPartitionFieldsValues(jobId, searchTerm, criteriaFields, earliestMs, latestMs);
}

function getCategorizerStats(context: RequestHandlerContext, params: any, query: any) {
const { jobId } = params;
const { partitionByValue } = query;
const rs = resultsServiceProvider(context.ml!.mlClient);
return rs.getCategorizerStats(jobId, partitionByValue);
}

function getStoppedPartitions(context: RequestHandlerContext, payload: any) {
const { jobId } = payload;
const rs = resultsServiceProvider(context.ml!.mlClient);
return rs.getStoppedPartitions(jobId);
}

/**
* Routes for results service
*/
Expand Down Expand Up @@ -292,45 +302,9 @@ export function resultsServiceRoutes({ router, mlLicense }: RouteInitialization)
},
mlLicense.fullLicenseAPIGuard(async (context, request, response) => {
try {
const { jobId } = request.params;
const { partitionByValue } = request.query;
const mustMatchClauses: Array<Record<'match', Record<string, string>>> = [
{
match: {
result_type: 'categorizer_stats',
},
},
];

if (typeof partitionByValue === 'string') {
mustMatchClauses.push({
match: {
partition_by_value: partitionByValue,
},
});
}
const results: SearchResponse<AnomalyCategorizerStatsDoc> = await context.ml!.mlClient.callAsCurrentUser(
'search',
{
index: ML_RESULTS_INDEX_PATTERN,
body: {
query: {
bool: {
must: mustMatchClauses,
filter: [
{
term: {
job_id: jobId,
},
},
],
},
},
},
}
);
const resp = await getCategorizerStats(context, request.params, request.query);
return response.ok({
body: results ? results.hits.hits.map((r) => r._source) : [],
body: resp,
});
} catch (e) {
return response.customError(wrapError(e));
Expand Down Expand Up @@ -358,74 +332,9 @@ export function resultsServiceRoutes({ router, mlLicense }: RouteInitialization)
},
mlLicense.fullLicenseAPIGuard(async (context, request, response) => {
try {
const { jobId } = request.params;
let finalResult: Array<{ key: string; doc_count: number }> = [];
// first determine from job config if stop_on_warn is true
// if false return []
const jobConfigResponse = await context.ml!.mlClient.callAsInternalUser('ml.jobs', {
jobId,
});

if (!jobConfigResponse || jobConfigResponse.jobs.length !== 1) {
return response.customError({
statusCode: 404,
body: `Unable to find anomaly detector job with ID ${jobId}`,
});
}

const jobConfig = jobConfigResponse.jobs[0];
if (jobConfig.analysis_config?.per_partition_categorization?.stop_on_warn === true) {
// search for categorizer_stats documents for the current job where the categorization_status is warn
// Return all the partition_field_value values from the documents found
const mustMatchClauses: Array<Record<'match', Record<string, string>>> = [
{
match: {
result_type: 'categorizer_stats',
},
},
{
match: {
categorization_status: 'warn',
},
},
];
const results: SearchResponse<any> = await context.ml!.mlClient.callAsCurrentUser(
'search',
{
index: ML_RESULTS_INDEX_PATTERN,
size: 0,
body: {
query: {
bool: {
must: mustMatchClauses,
filter: [
{
term: {
job_id: jobId,
},
},
],
},
},
aggs: {
unique_partition_field_values: {
terms: {
field: 'partition_field_value',
},
},
},
},
}
);
if (Array.isArray(results.aggregations?.unique_partition_field_values?.buckets)) {
finalResult = results.aggregations?.unique_partition_field_values?.buckets.map(
(b: { key: string; doc_count: number }) => b.key
);
}
}

const resp = await getStoppedPartitions(context, request.params);
return response.ok({
body: finalResult,
body: resp,
});
} catch (e) {
return response.customError(wrapError(e));
Expand Down

0 comments on commit abe77e7

Please sign in to comment.