Skip to content

Commit

Permalink
[ML] fields_aggs_cache.ts as part of fields_service
Browse files Browse the repository at this point in the history
  • Loading branch information
darnautov committed Mar 22, 2020
1 parent a46aa53 commit cbc5f0d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

import numeral from '@elastic/numeral';
import { pick } from 'lodash';
import { APICaller } from 'kibana/server';
import { AnalysisConfig } from '../../../common/types/anomaly_detection_jobs';
import { fieldsServiceProvider } from '../fields_service';
Expand All @@ -32,88 +31,11 @@ export interface ModelMemoryEstimate {
model_memory_estimate: string;
}

/**
* Caches cardinality fields values to avoid
* unnecessary aggregations on elasticsearch
*/
const initCardinalityFieldsCache = () => {
const cardinalityCache = new Map<
string,
{
overallCardinality: { [field: string]: number };
maxBucketCardinality: { [field: string]: number };
}
>();

return {
/**
* Gets requested values from cache
* @param indexPattern
* @param timeField
* @param earliestMs
* @param latestMs
* @param overallCardinalityFields - array of overall cardinality fields to retrieve form cache
* @param maxBucketCardinalityFields - array of max bucket cardinality field to retrieve from cache
*/
getValues(
indexPattern: string,
timeField: string,
earliestMs: number,
latestMs: number,
overallCardinalityFields: string[],
maxBucketCardinalityFields: string[]
): {
overallCardinality: { [field: string]: number };
maxBucketCardinality: { [field: string]: number };
} | null {
const cacheKey = indexPattern + timeField + earliestMs + latestMs;
const cached = cardinalityCache.get(cacheKey);
if (!cached) {
return null;
}

const overallCardinality = pick(cached.overallCardinality, overallCardinalityFields);
const maxBucketCardinality = pick(cached.maxBucketCardinality, maxBucketCardinalityFields);

return {
overallCardinality,
maxBucketCardinality,
};
},
/**
* Extends cache with provided values
*/
updateValues(
indexPattern: string,
timeField: string,
earliestMs: number,
latestMs: number,
update: {
overallCardinality?: { [field: string]: number };
maxBucketCardinality?: { [field: string]: number };
}
): void {
const cacheKey = indexPattern + timeField + earliestMs + latestMs;
const cachedValues = cardinalityCache.get(cacheKey);
if (cachedValues === undefined) {
cardinalityCache.set(cacheKey, {
overallCardinality: update.overallCardinality ?? {},
maxBucketCardinality: update.maxBucketCardinality ?? {},
});
return;
}

Object.assign(cachedValues.overallCardinality, update.overallCardinality);
Object.assign(cachedValues.maxBucketCardinality, update.maxBucketCardinality);
},
};
};

/**
* Retrieves overall and max bucket cardinalities.
*/
const cardinalityCheckProvider = (callAsCurrentUser: APICaller) => {
const cardinalityFieldsCache = initCardinalityFieldsCache();
const fieldsService = fieldsServiceProvider(callAsCurrentUser);

return async (
analysisConfig: AnalysisConfig,
Expand All @@ -137,8 +59,6 @@ const cardinalityCheckProvider = (callAsCurrentUser: APICaller) => {
'mlcategory'
);

const fieldsService = fieldsServiceProvider(callAsCurrentUser);

const { detectors, influencers, bucket_span: bucketSpan } = analysisConfig;

let overallCardinality = {};
Expand Down Expand Up @@ -171,55 +91,27 @@ const cardinalityCheckProvider = (callAsCurrentUser: APICaller) => {
!overallCardinalityFields.has(influencerField)
) as string[];

// Check if some of the values are already cached
const cachedValues = cardinalityFieldsCache.getValues(
indexPattern,
timeFieldName,
earliestMs,
latestMs,
[...overallCardinalityFields],
maxBucketFieldCardinalities
);
overallCardinality = cachedValues?.overallCardinality ?? {};
maxBucketCardinality = cachedValues?.maxBucketCardinality ?? {};

const overallCardinalityFieldsToFetch = [...overallCardinalityFields].filter(
v => !overallCardinality.hasOwnProperty(v)
);
if (overallCardinalityFieldsToFetch.length > 0) {
const overallCardinalitResp = await fieldsService.getCardinalityOfFields(
if (overallCardinalityFields.size > 0) {
overallCardinality = await fieldsService.getCardinalityOfFields(
indexPattern,
overallCardinalityFieldsToFetch,
[...overallCardinalityFields],
query,
timeFieldName,
earliestMs,
latestMs
);
overallCardinality = { ...overallCardinality, ...overallCardinalitResp };

cardinalityFieldsCache.updateValues(indexPattern, timeFieldName, earliestMs, latestMs, {
overallCardinality: overallCardinalitResp,
});
}

const maxBucketCardinalityFeildToFetch = maxBucketFieldCardinalities.filter(
v => !maxBucketCardinality.hasOwnProperty(v)
);
if (maxBucketCardinalityFeildToFetch.length > 0) {
const maxBucketCardinalityResp = await fieldsService.getMaxBucketCardinalities(
if (maxBucketFieldCardinalities.length > 0) {
maxBucketCardinality = await fieldsService.getMaxBucketCardinalities(
indexPattern,
maxBucketCardinalityFeildToFetch,
maxBucketFieldCardinalities,
query,
timeFieldName,
earliestMs,
latestMs,
bucketSpan
);
maxBucketCardinality = { ...maxBucketCardinality, ...maxBucketCardinalityResp };

cardinalityFieldsCache.updateValues(indexPattern, timeFieldName, earliestMs, latestMs, {
maxBucketCardinality: maxBucketCardinalityResp,
});
}

return {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { pick } from 'lodash';

/**
* Aggregations types that cached.
*/
type AggType = 'overallCardinality' | 'maxBucketCardinality';

type CacheStorage = { [key in AggType]: { [field: string]: number } };

/**
* Caches cardinality fields values to avoid
* unnecessary aggregations on elasticsearch
*/
export const initCardinalityFieldsCache = () => {
const cardinalityCache = new Map<string, CacheStorage>();

return {
/**
* Gets requested values from cache
*/
getValues(
indexPattern: string | string[],
timeField: string,
earliestMs: number,
latestMs: number,
aggType: AggType,
fieldNames: string[]
): CacheStorage[AggType] | null {
const cacheKey = indexPattern + timeField + earliestMs + latestMs;
const cached = cardinalityCache.get(cacheKey);
if (!cached) {
return null;
}
return pick(cached[aggType], fieldNames);
},
/**
* Extends cache with provided values
*/
updateValues(
indexPattern: string | string[],
timeField: string,
earliestMs: number,
latestMs: number,
update: Partial<CacheStorage>
): void {
const cacheKey = indexPattern + timeField + earliestMs + latestMs;
const cachedValues = cardinalityCache.get(cacheKey);
if (cachedValues === undefined) {
cardinalityCache.set(cacheKey, {
overallCardinality: update.overallCardinality ?? {},
maxBucketCardinality: update.maxBucketCardinality ?? {},
});
return;
}

Object.assign(cachedValues.overallCardinality, update.overallCardinality);
Object.assign(cachedValues.maxBucketCardinality, update.maxBucketCardinality);
},
};
};
61 changes: 56 additions & 5 deletions x-pack/plugins/ml/server/models/fields_service/fields_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
import Boom from 'boom';
import { APICaller } from 'kibana/server';
import { parseInterval } from '../../../common/util/parse_interval';
import { initCardinalityFieldsCache } from './fields_aggs_cache';

/**
* Service for carrying out queries to obtain data
* specific to fields in Elasticsearch indices.
*/
export function fieldsServiceProvider(callAsCurrentUser: APICaller) {
const fieldsAggsCache = initCardinalityFieldsCache();

/**
* Gets aggregatable fields.
*/
Expand Down Expand Up @@ -58,6 +61,23 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) {
return {};
}

const cachedValues =
fieldsAggsCache.getValues(
index,
timeFieldName,
earliestMs,
latestMs,
'overallCardinality',
fieldNames
) ?? {};

// No need to perform aggregation over the cached fields
const fieldsToAgg = aggregatableFields.filter(field => !cachedValues.hasOwnProperty(field));

if (fieldsToAgg.length === 0) {
return cachedValues;
}

// Build the criteria to use in the bool filter part of the request.
// Add criteria for the time range and the datafeed config query.
const mustCriteria = [
Expand All @@ -76,7 +96,7 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) {
mustCriteria.push(query);
}

const aggs = aggregatableFields.reduce((obj, field) => {
const aggs = fieldsToAgg.reduce((obj, field) => {
obj[field] = { cardinality: { field } };
return obj;
}, {} as { [field: string]: { cardinality: { field: string } } });
Expand Down Expand Up @@ -105,10 +125,19 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) {
return {};
}

return aggregatableFields.reduce((obj, field) => {
const aggResult = fieldsToAgg.reduce((obj, field) => {
obj[field] = (aggregations[field] || { value: 0 }).value;
return obj;
}, {} as { [field: string]: number });

fieldsAggsCache.updateValues(index, timeFieldName, earliestMs, latestMs, {
overallCardinality: aggResult,
});

return {
...cachedValues,
...aggResult,
};
}

/**
Expand Down Expand Up @@ -214,6 +243,23 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) {
return {};
}

const cachedValues =
fieldsAggsCache.getValues(
index,
timeFieldName,
earliestMs,
latestMs,
'maxBucketCardinality',
fieldNames
) ?? {};

// No need to perform aggregation over the cached fields
const fieldsToAgg = aggregatableFields.filter(field => !cachedValues.hasOwnProperty(field));

if (fieldsToAgg.length === 0) {
return cachedValues;
}

const { start, end } = getSafeTimeRange(earliestMs, latestMs, interval);

const mustCriteria = [
Expand All @@ -239,7 +285,7 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) {
const getSafeAggName = (field: string) => field.replace(/\W/g, '');
const getMaxBucketAggKey = (field: string) => `max_bucket_${field}`;

const fieldsCardinalityAggs = aggregatableFields.reduce((obj, field) => {
const fieldsCardinalityAggs = fieldsToAgg.reduce((obj, field) => {
obj[getSafeAggName(field)] = { cardinality: { field } };
return obj;
}, {} as { [field: string]: { cardinality: { field: string } } });
Expand Down Expand Up @@ -280,13 +326,18 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) {
)?.aggregations;

if (!aggregations) {
return {};
return cachedValues;
}

return aggregatableFields.reduce((obj, field) => {
const aggResult = fieldsToAgg.reduce((obj, field) => {
obj[field] = (aggregations[getMaxBucketAggKey(field)] || { value: 0 }).value ?? 0;
return obj;
}, {} as { [field: string]: number });

return {
...cachedValues,
...aggResult,
};
}

return {
Expand Down

0 comments on commit cbc5f0d

Please sign in to comment.