Skip to content

Commit

Permalink
[ML] address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
darnautov committed Feb 11, 2020
1 parent 9ca99ad commit 63ea75b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import fs from 'fs';
import Boom from 'boom';
import numeral from '@elastic/numeral';
import { CallAPIOptions, RequestHandlerContext, SavedObjectsClient } from 'kibana/server';
import { CallAPIOptions, RequestHandlerContext, SavedObjectsClientContract } from 'kibana/server';
import { merge } from 'lodash';
import { MlJob } from '../../../common/types/jobs';
import {
Expand Down Expand Up @@ -66,13 +66,11 @@ interface Config {
}

interface Result {
id: any;
title: any;
id: string;
title: string;
query: any;
description: any;
logo: {
icon: string;
} | null;
description: string;
logo: { icon: string } | null;
}

interface JobStat {
Expand Down Expand Up @@ -109,16 +107,17 @@ export class DataRecognizer {
modulesDir = `${__dirname}/modules`;
indexPatternName: string = '';
indexPatternId: string | undefined = undefined;
savedObjectsClient: SavedObjectsClient | undefined = undefined;
savedObjectsClient: SavedObjectsClientContract;

callWithRequest: (
callAsCurrentUser: (
endpoint: string,
clientParams?: Record<string, any>,
options?: CallAPIOptions
) => Promise<any>;

constructor(context: RequestHandlerContext) {
this.callWithRequest = context.ml!.mlClient.callAsCurrentUser;
this.callAsCurrentUser = context.ml!.mlClient.callAsCurrentUser;
this.savedObjectsClient = context.core.savedObjects.client;
}

// list all directories under the given directory
Expand Down Expand Up @@ -240,7 +239,7 @@ export class DataRecognizer {
query: moduleConfig.query,
};

const resp = await this.callWithRequest('search', {
const resp = await this.callAsCurrentUser('search', {
index,
rest_total_hits_as_int: true,
size,
Expand Down Expand Up @@ -377,11 +376,8 @@ export class DataRecognizer {
start: number,
end: number,
jobOverrides: JobOverride[],
datafeedOverrides: DatafeedOverride[],
savedObjectsClient: SavedObjectsClient
datafeedOverrides: DatafeedOverride[]
) {
this.savedObjectsClient = savedObjectsClient;

// load the config from disk
const moduleConfig = await this.getModule(moduleId, jobPrefix);

Expand Down Expand Up @@ -496,7 +492,7 @@ export class DataRecognizer {
// Add a wildcard at the front of each of the job IDs in the module,
// as a prefix may have been supplied when creating the jobs in the module.
const jobIds = module.jobs.map(job => `*${job.id}`);
const { jobsExist } = jobServiceProvider(this.callWithRequest);
const { jobsExist } = jobServiceProvider(this.callAsCurrentUser);
const jobInfo = await jobsExist(jobIds);

// Check if the value for any of the jobs is false.
Expand All @@ -505,11 +501,11 @@ export class DataRecognizer {

if (doJobsExist === true) {
// Get the IDs of the jobs created from the module, and their earliest / latest timestamps.
const jobStats: MlJobStats = await this.callWithRequest('ml.jobStats', { jobId: jobIds });
const jobStats: MlJobStats = await this.callAsCurrentUser('ml.jobStats', { jobId: jobIds });
const jobStatsJobs: JobStat[] = [];
if (jobStats.jobs && jobStats.jobs.length > 0) {
const foundJobIds = jobStats.jobs.map(job => job.job_id);
const { getLatestBucketTimestampByJob } = resultsServiceProvider(this.callWithRequest);
const { getLatestBucketTimestampByJob } = resultsServiceProvider(this.callAsCurrentUser);
const latestBucketTimestampsByJob = await getLatestBucketTimestampByJob(foundJobIds);

jobStats.jobs.forEach(job => {
Expand All @@ -536,7 +532,7 @@ export class DataRecognizer {
}

async loadIndexPatterns() {
return await this.savedObjectsClient!.find({ type: 'index-pattern', perPage: 1000 });
return await this.savedObjectsClient.find({ type: 'index-pattern', perPage: 1000 });
}

// returns a id based on an index pattern name
Expand Down Expand Up @@ -624,7 +620,7 @@ export class DataRecognizer {

// find all existing savedObjects for a given type
loadExistingSavedObjects(type: string) {
return this.savedObjectsClient!.find({ type, perPage: 1000 });
return this.savedObjectsClient.find({ type, perPage: 1000 });
}

// save the savedObjects if they do not exist already
Expand All @@ -634,7 +630,7 @@ export class DataRecognizer {
.filter(o => o.exists === false)
.map(o => o.savedObject);
if (filteredSavedObjects.length) {
results = await this.savedObjectsClient!.bulkCreate(
results = await this.savedObjectsClient.bulkCreate(
// Add an empty migrationVersion attribute to each saved object to ensure
// it is automatically migrated to the 7.0+ format with a references attribute.
filteredSavedObjects.map(doc => ({ ...doc, migrationVersion: doc.migrationVersion || {} }))
Expand Down Expand Up @@ -663,7 +659,7 @@ export class DataRecognizer {

async saveJob(job: ModuleJob) {
const { id: jobId, config: body } = job;
return this.callWithRequest('ml.addJob', { jobId, body });
return this.callAsCurrentUser('ml.addJob', { jobId, body });
}

// save the datafeeds.
Expand All @@ -684,7 +680,7 @@ export class DataRecognizer {

async saveDatafeed(datafeed: ModuleDataFeed) {
const { id: datafeedId, config: body } = datafeed;
return this.callWithRequest('ml.addDatafeed', { datafeedId, body });
return this.callAsCurrentUser('ml.addDatafeed', { datafeedId, body });
}

async startDatafeeds(
Expand All @@ -707,7 +703,7 @@ export class DataRecognizer {
const result = { started: false } as DatafeedResponse;
let opened = false;
try {
const openResult = await this.callWithRequest('ml.openJob', {
const openResult = await this.callAsCurrentUser('ml.openJob', {
jobId: datafeed.config.job_id,
});
opened = openResult.opened;
Expand All @@ -731,7 +727,7 @@ export class DataRecognizer {
duration.end = end;
}

await this.callWithRequest('ml.startDatafeed', { datafeedId: datafeed.id, ...duration });
await this.callAsCurrentUser('ml.startDatafeed', { datafeedId: datafeed.id, ...duration });
result.started = true;
} catch (error) {
result.started = false;
Expand Down Expand Up @@ -797,7 +793,7 @@ export class DataRecognizer {
jobs: moduleConfig.jobs,
datafeeds: moduleConfig.datafeeds,
kibana: moduleConfig.kibana,
} as const;
};

function createResultsItems(
configItems: any[],
Expand Down Expand Up @@ -936,7 +932,7 @@ export class DataRecognizer {
// ensure the model memory limit for each job is not greater than
// the max model memory setting for the cluster
async updateModelMemoryLimits(moduleConfig: Module) {
const { limits } = await this.callWithRequest('ml.info');
const { limits } = await this.callAsCurrentUser('ml.info');
const maxMml = limits.max_model_memory_limit;
if (maxMml !== undefined) {
// @ts-ignore
Expand Down
25 changes: 25 additions & 0 deletions x-pack/legacy/plugins/ml/server/new_platform/modules.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { schema } from '@kbn/config-schema';

export const setupModuleBodySchema = schema.object({
prefix: schema.maybe(schema.string()),
groups: schema.maybe(schema.arrayOf(schema.string())),
indexPatternName: schema.string(),
query: schema.maybe(schema.any()),
useDedicatedIndex: schema.maybe(schema.boolean()),
startDatafeed: schema.maybe(schema.boolean()),
start: schema.maybe(schema.number()),
end: schema.maybe(schema.number()),
jobOverrides: schema.maybe(schema.any()),
datafeedOverrides: schema.maybe(schema.any()),
});

export const getModuleIdParamSchema = (optional = false) => {
const stringType = schema.string();
return { moduleId: optional ? schema.maybe(stringType) : stringType };
};
39 changes: 10 additions & 29 deletions x-pack/legacy/plugins/ml/server/routes/modules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
*/

import { schema } from '@kbn/config-schema';
import { RequestHandlerContext, SavedObjectsClient } from 'kibana/server';
import { RequestHandlerContext } from 'kibana/server';
import { DatafeedOverride, JobOverride } from '../../common/types/modules';
import { wrapError } from '../client/error_wrapper';
import { getSavedObjectsClient } from '../lib/ml_telemetry';
import { DataRecognizer } from '../models/data_recognizer';
import { licensePreRoutingFactory } from '../new_platform/licence_check_pre_routing_factory';
import { getModuleIdParamSchema, setupModuleBodySchema } from '../new_platform/modules';
import { RouteInitialization } from '../new_platform/plugin';

function recognize(context: RequestHandlerContext, indexPatternTitle: string) {
Expand Down Expand Up @@ -39,8 +39,7 @@ function saveModuleItems(
start: number,
end: number,
jobOverrides: JobOverride[],
datafeedOverrides: DatafeedOverride[],
savedObjectsClient: SavedObjectsClient
datafeedOverrides: DatafeedOverride[]
) {
const dr = new DataRecognizer(context);
return dr.setupModuleItems(
Expand All @@ -54,8 +53,7 @@ function saveModuleItems(
start,
end,
jobOverrides,
datafeedOverrides,
savedObjectsClient
datafeedOverrides
);
}

Expand All @@ -67,12 +65,7 @@ function dataRecognizerJobsExist(context: RequestHandlerContext, moduleId: strin
/**
* Recognizer routes.
*/
export function dataRecognizer({
xpackMainPlugin,
router,
elasticsearchPlugin,
savedObjects,
}: RouteInitialization) {
export function dataRecognizer({ xpackMainPlugin, router }: RouteInitialization) {
/**
* @apiGroup DataRecognizer
*
Expand Down Expand Up @@ -117,7 +110,7 @@ export function dataRecognizer({
path: '/api/ml/modules/get_module/{moduleId?}',
validate: {
params: schema.object({
moduleId: schema.maybe(schema.string()),
...getModuleIdParamSchema(true),
}),
},
},
Expand Down Expand Up @@ -152,20 +145,9 @@ export function dataRecognizer({
path: '/api/ml/modules/setup/{moduleId}',
validate: {
params: schema.object({
moduleId: schema.string(),
}),
body: schema.object({
prefix: schema.string(),
groups: schema.maybe(schema.arrayOf(schema.string())),
indexPatternName: schema.string(),
query: schema.maybe(schema.any()),
useDedicatedIndex: schema.maybe(schema.boolean()),
startDatafeed: schema.maybe(schema.boolean()),
start: schema.maybe(schema.number()),
end: schema.maybe(schema.number()),
jobOverrides: schema.maybe(schema.any()),
datafeedOverrides: schema.maybe(schema.any()),
...getModuleIdParamSchema(),
}),
body: setupModuleBodySchema,
},
},
licensePreRoutingFactory(xpackMainPlugin, async (context, request, response) => {
Expand Down Expand Up @@ -197,8 +179,7 @@ export function dataRecognizer({
start,
end,
jobOverrides,
datafeedOverrides,
getSavedObjectsClient(elasticsearchPlugin, savedObjects!)
datafeedOverrides
);

return response.ok({ body: result });
Expand All @@ -222,7 +203,7 @@ export function dataRecognizer({
path: '/api/ml/modules/jobs_exist/{moduleId}',
validate: {
params: schema.object({
moduleId: schema.string(),
...getModuleIdParamSchema(),
}),
},
},
Expand Down

0 comments on commit 63ea75b

Please sign in to comment.