From cbc5f0d48be25681eeb855ec02bf20150ef85ba7 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Sun, 22 Mar 2020 17:35:16 +0100 Subject: [PATCH] [ML] fields_aggs_cache.ts as part of fields_service --- .../calculate_model_memory_limit.ts | 122 +----------------- .../fields_service/fields_aggs_cache.ts | 66 ++++++++++ .../models/fields_service/fields_service.ts | 61 ++++++++- 3 files changed, 129 insertions(+), 120 deletions(-) create mode 100644 x-pack/plugins/ml/server/models/fields_service/fields_aggs_cache.ts diff --git a/x-pack/plugins/ml/server/models/calculate_model_memory_limit/calculate_model_memory_limit.ts b/x-pack/plugins/ml/server/models/calculate_model_memory_limit/calculate_model_memory_limit.ts index fb6ff82d8e3c0..70bbc110ac050 100644 --- a/x-pack/plugins/ml/server/models/calculate_model_memory_limit/calculate_model_memory_limit.ts +++ b/x-pack/plugins/ml/server/models/calculate_model_memory_limit/calculate_model_memory_limit.ts @@ -5,7 +5,6 @@ */ import numeral from '@elastic/numeral'; -import { pick } from 'lodash'; import { APICaller } from 'kibana/server'; import { AnalysisConfig } from '../../../common/types/anomaly_detection_jobs'; import { fieldsServiceProvider } from '../fields_service'; @@ -32,88 +31,11 @@ export interface ModelMemoryEstimate { model_memory_estimate: string; } -/** - * Caches cardinality fields values to avoid - * unnecessary aggregations on elasticsearch - */ -const initCardinalityFieldsCache = () => { - const cardinalityCache = new Map< - string, - { - overallCardinality: { [field: string]: number }; - maxBucketCardinality: { [field: string]: number }; - } - >(); - - return { - /** - * Gets requested values from cache - * @param indexPattern - * @param timeField - * @param earliestMs - * @param latestMs - * @param overallCardinalityFields - array of overall cardinality fields to retrieve form cache - * @param maxBucketCardinalityFields - array of max bucket cardinality field to retrieve from cache - */ - getValues( - indexPattern: string, - timeField: string, - earliestMs: number, - latestMs: number, - overallCardinalityFields: string[], - maxBucketCardinalityFields: string[] - ): { - overallCardinality: { [field: string]: number }; - maxBucketCardinality: { [field: string]: number }; - } | null { - const cacheKey = indexPattern + timeField + earliestMs + latestMs; - const cached = cardinalityCache.get(cacheKey); - if (!cached) { - return null; - } - - const overallCardinality = pick(cached.overallCardinality, overallCardinalityFields); - const maxBucketCardinality = pick(cached.maxBucketCardinality, maxBucketCardinalityFields); - - return { - overallCardinality, - maxBucketCardinality, - }; - }, - /** - * Extends cache with provided values - */ - updateValues( - indexPattern: string, - timeField: string, - earliestMs: number, - latestMs: number, - update: { - overallCardinality?: { [field: string]: number }; - maxBucketCardinality?: { [field: string]: number }; - } - ): void { - const cacheKey = indexPattern + timeField + earliestMs + latestMs; - const cachedValues = cardinalityCache.get(cacheKey); - if (cachedValues === undefined) { - cardinalityCache.set(cacheKey, { - overallCardinality: update.overallCardinality ?? {}, - maxBucketCardinality: update.maxBucketCardinality ?? {}, - }); - return; - } - - Object.assign(cachedValues.overallCardinality, update.overallCardinality); - Object.assign(cachedValues.maxBucketCardinality, update.maxBucketCardinality); - }, - }; -}; - /** * Retrieves overall and max bucket cardinalities. */ const cardinalityCheckProvider = (callAsCurrentUser: APICaller) => { - const cardinalityFieldsCache = initCardinalityFieldsCache(); + const fieldsService = fieldsServiceProvider(callAsCurrentUser); return async ( analysisConfig: AnalysisConfig, @@ -137,8 +59,6 @@ const cardinalityCheckProvider = (callAsCurrentUser: APICaller) => { 'mlcategory' ); - const fieldsService = fieldsServiceProvider(callAsCurrentUser); - const { detectors, influencers, bucket_span: bucketSpan } = analysisConfig; let overallCardinality = {}; @@ -171,55 +91,27 @@ const cardinalityCheckProvider = (callAsCurrentUser: APICaller) => { !overallCardinalityFields.has(influencerField) ) as string[]; - // Check if some of the values are already cached - const cachedValues = cardinalityFieldsCache.getValues( - indexPattern, - timeFieldName, - earliestMs, - latestMs, - [...overallCardinalityFields], - maxBucketFieldCardinalities - ); - overallCardinality = cachedValues?.overallCardinality ?? {}; - maxBucketCardinality = cachedValues?.maxBucketCardinality ?? {}; - - const overallCardinalityFieldsToFetch = [...overallCardinalityFields].filter( - v => !overallCardinality.hasOwnProperty(v) - ); - if (overallCardinalityFieldsToFetch.length > 0) { - const overallCardinalitResp = await fieldsService.getCardinalityOfFields( + if (overallCardinalityFields.size > 0) { + overallCardinality = await fieldsService.getCardinalityOfFields( indexPattern, - overallCardinalityFieldsToFetch, + [...overallCardinalityFields], query, timeFieldName, earliestMs, latestMs ); - overallCardinality = { ...overallCardinality, ...overallCardinalitResp }; - - cardinalityFieldsCache.updateValues(indexPattern, timeFieldName, earliestMs, latestMs, { - overallCardinality: overallCardinalitResp, - }); } - const maxBucketCardinalityFeildToFetch = maxBucketFieldCardinalities.filter( - v => !maxBucketCardinality.hasOwnProperty(v) - ); - if (maxBucketCardinalityFeildToFetch.length > 0) { - const maxBucketCardinalityResp = await fieldsService.getMaxBucketCardinalities( + if (maxBucketFieldCardinalities.length > 0) { + maxBucketCardinality = await fieldsService.getMaxBucketCardinalities( indexPattern, - maxBucketCardinalityFeildToFetch, + maxBucketFieldCardinalities, query, timeFieldName, earliestMs, latestMs, bucketSpan ); - maxBucketCardinality = { ...maxBucketCardinality, ...maxBucketCardinalityResp }; - - cardinalityFieldsCache.updateValues(indexPattern, timeFieldName, earliestMs, latestMs, { - maxBucketCardinality: maxBucketCardinalityResp, - }); } return { diff --git a/x-pack/plugins/ml/server/models/fields_service/fields_aggs_cache.ts b/x-pack/plugins/ml/server/models/fields_service/fields_aggs_cache.ts new file mode 100644 index 0000000000000..f0efd15275d38 --- /dev/null +++ b/x-pack/plugins/ml/server/models/fields_service/fields_aggs_cache.ts @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { pick } from 'lodash'; + +/** + * Aggregations types that cached. + */ +type AggType = 'overallCardinality' | 'maxBucketCardinality'; + +type CacheStorage = { [key in AggType]: { [field: string]: number } }; + +/** + * Caches cardinality fields values to avoid + * unnecessary aggregations on elasticsearch + */ +export const initCardinalityFieldsCache = () => { + const cardinalityCache = new Map(); + + return { + /** + * Gets requested values from cache + */ + getValues( + indexPattern: string | string[], + timeField: string, + earliestMs: number, + latestMs: number, + aggType: AggType, + fieldNames: string[] + ): CacheStorage[AggType] | null { + const cacheKey = indexPattern + timeField + earliestMs + latestMs; + const cached = cardinalityCache.get(cacheKey); + if (!cached) { + return null; + } + return pick(cached[aggType], fieldNames); + }, + /** + * Extends cache with provided values + */ + updateValues( + indexPattern: string | string[], + timeField: string, + earliestMs: number, + latestMs: number, + update: Partial + ): void { + const cacheKey = indexPattern + timeField + earliestMs + latestMs; + const cachedValues = cardinalityCache.get(cacheKey); + if (cachedValues === undefined) { + cardinalityCache.set(cacheKey, { + overallCardinality: update.overallCardinality ?? {}, + maxBucketCardinality: update.maxBucketCardinality ?? {}, + }); + return; + } + + Object.assign(cachedValues.overallCardinality, update.overallCardinality); + Object.assign(cachedValues.maxBucketCardinality, update.maxBucketCardinality); + }, + }; +}; diff --git a/x-pack/plugins/ml/server/models/fields_service/fields_service.ts b/x-pack/plugins/ml/server/models/fields_service/fields_service.ts index aae79706c5588..567c5d2afb7de 100644 --- a/x-pack/plugins/ml/server/models/fields_service/fields_service.ts +++ b/x-pack/plugins/ml/server/models/fields_service/fields_service.ts @@ -7,12 +7,15 @@ import Boom from 'boom'; import { APICaller } from 'kibana/server'; import { parseInterval } from '../../../common/util/parse_interval'; +import { initCardinalityFieldsCache } from './fields_aggs_cache'; /** * Service for carrying out queries to obtain data * specific to fields in Elasticsearch indices. */ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { + const fieldsAggsCache = initCardinalityFieldsCache(); + /** * Gets aggregatable fields. */ @@ -58,6 +61,23 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { return {}; } + const cachedValues = + fieldsAggsCache.getValues( + index, + timeFieldName, + earliestMs, + latestMs, + 'overallCardinality', + fieldNames + ) ?? {}; + + // No need to perform aggregation over the cached fields + const fieldsToAgg = aggregatableFields.filter(field => !cachedValues.hasOwnProperty(field)); + + if (fieldsToAgg.length === 0) { + return cachedValues; + } + // Build the criteria to use in the bool filter part of the request. // Add criteria for the time range and the datafeed config query. const mustCriteria = [ @@ -76,7 +96,7 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { mustCriteria.push(query); } - const aggs = aggregatableFields.reduce((obj, field) => { + const aggs = fieldsToAgg.reduce((obj, field) => { obj[field] = { cardinality: { field } }; return obj; }, {} as { [field: string]: { cardinality: { field: string } } }); @@ -105,10 +125,19 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { return {}; } - return aggregatableFields.reduce((obj, field) => { + const aggResult = fieldsToAgg.reduce((obj, field) => { obj[field] = (aggregations[field] || { value: 0 }).value; return obj; }, {} as { [field: string]: number }); + + fieldsAggsCache.updateValues(index, timeFieldName, earliestMs, latestMs, { + overallCardinality: aggResult, + }); + + return { + ...cachedValues, + ...aggResult, + }; } /** @@ -214,6 +243,23 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { return {}; } + const cachedValues = + fieldsAggsCache.getValues( + index, + timeFieldName, + earliestMs, + latestMs, + 'maxBucketCardinality', + fieldNames + ) ?? {}; + + // No need to perform aggregation over the cached fields + const fieldsToAgg = aggregatableFields.filter(field => !cachedValues.hasOwnProperty(field)); + + if (fieldsToAgg.length === 0) { + return cachedValues; + } + const { start, end } = getSafeTimeRange(earliestMs, latestMs, interval); const mustCriteria = [ @@ -239,7 +285,7 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { const getSafeAggName = (field: string) => field.replace(/\W/g, ''); const getMaxBucketAggKey = (field: string) => `max_bucket_${field}`; - const fieldsCardinalityAggs = aggregatableFields.reduce((obj, field) => { + const fieldsCardinalityAggs = fieldsToAgg.reduce((obj, field) => { obj[getSafeAggName(field)] = { cardinality: { field } }; return obj; }, {} as { [field: string]: { cardinality: { field: string } } }); @@ -280,13 +326,18 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { )?.aggregations; if (!aggregations) { - return {}; + return cachedValues; } - return aggregatableFields.reduce((obj, field) => { + const aggResult = fieldsToAgg.reduce((obj, field) => { obj[field] = (aggregations[getMaxBucketAggKey(field)] || { value: 0 }).value ?? 0; return obj; }, {} as { [field: string]: number }); + + return { + ...cachedValues, + ...aggResult, + }; } return {