Skip to content

Commit

Permalink
Update logstash files to ts where we read from source (elastic#86787)
Browse files Browse the repository at this point in the history
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
chrisronline and kibanamachine committed Jan 12, 2021
1 parent efaebd8 commit 2084f2d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 67 deletions.
31 changes: 31 additions & 0 deletions x-pack/plugins/monitoring/common/types/es.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,40 @@ export interface ElasticsearchSourceKibanaStats {
};
}

export interface ElasticsearchSourceLogstashPipelineVertex {
id: string;
plugin_type: string;
stats?: {
[key: string]: {
data?: any[];
};
};
}

export interface ElasticsearchSource {
timestamp: string;
kibana_stats?: ElasticsearchSourceKibanaStats;
logstash_state?: {
pipeline?: {
representation?: {
graph?: {
vertices?: ElasticsearchSourceLogstashPipelineVertex[];
};
};
};
};
logstash_stats?: {
timestamp?: string;
logstash?: {};
events?: {};
reloads?: {};
queue?: {
type?: string;
};
jvm?: {
uptime_in_millis?: number;
};
};
beats_stats?: {
timestamp?: string;
beat?: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,31 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { get, merge } from 'lodash';
import { merge } from 'lodash';
// @ts-ignore
import { checkParam } from '../error_missing_required';
import { calculateAvailability } from './../calculate_availability';
// @ts-ignore
import { calculateAvailability } from '../calculate_availability';
import { LegacyRequest, ElasticsearchResponse } from '../../types';

export function handleResponse(resp) {
const source = get(resp, 'hits.hits[0]._source.logstash_stats');
const logstash = get(source, 'logstash');
export function handleResponse(resp: ElasticsearchResponse) {
const source = resp.hits?.hits[0]?._source?.logstash_stats;
const logstash = source?.logstash;
const info = merge(logstash, {
availability: calculateAvailability(get(source, 'timestamp')),
events: get(source, 'events'),
reloads: get(source, 'reloads'),
queue_type: get(source, 'queue.type'),
uptime: get(source, 'jvm.uptime_in_millis'),
availability: calculateAvailability(source?.timestamp),
events: source?.events,
reloads: source?.reloads,
queue_type: source?.queue?.type,
uptime: source?.jvm?.uptime_in_millis,
});
return info;
}

export function getNodeInfo(req, lsIndexPattern, { clusterUuid, logstashUuid }) {
export function getNodeInfo(
req: LegacyRequest,
lsIndexPattern: string,
{ clusterUuid, logstashUuid }: { clusterUuid: string; logstashUuid: string }
) {
checkParam(lsIndexPattern, 'lsIndexPattern in getNodeInfo');

const params = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@

import boom from '@hapi/boom';
import { get } from 'lodash';
// @ts-ignore
import { checkParam } from '../error_missing_required';
import { getPipelineStateDocument } from './get_pipeline_state_document';
// @ts-ignore
import { getPipelineStatsAggregation } from './get_pipeline_stats_aggregation';
// @ts-ignore
import { calculateTimeseriesInterval } from '../calculate_timeseries_interval';
import { LegacyRequest } from '../../types';
import {
ElasticsearchSource,
ElasticsearchSourceLogstashPipelineVertex,
} from '../../../common/types/es';

export function _vertexStats(
vertex,
vertexStatsBucket,
totalProcessorsDurationInMillis,
timeseriesIntervalInSeconds
vertex: ElasticsearchSourceLogstashPipelineVertex,
vertexStatsBucket: any,
totalProcessorsDurationInMillis: number,
timeseriesIntervalInSeconds: number
) {
const isInput = vertex.plugin_type === 'input';
const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output';
Expand All @@ -27,8 +35,11 @@ export function _vertexStats(

const durationInMillis = vertexStatsBucket.duration_in_millis_total.value;

const processorStats = {};
const eventsProcessedStats = {
const processorStats: any = {};
const eventsProcessedStats: {
events_out_per_millisecond: number;
events_in_per_millisecond?: number;
} = {
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis,
};

Expand Down Expand Up @@ -63,14 +74,14 @@ export function _vertexStats(
* @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds
*/
export function _enrichStateWithStatsAggregation(
stateDocument,
statsAggregation,
timeseriesIntervalInSeconds
stateDocument: ElasticsearchSource,
statsAggregation: any,
timeseriesIntervalInSeconds: number
) {
const logstashState = stateDocument.logstash_state;
const vertices = logstashState.pipeline.representation.graph.vertices;
const vertices = logstashState?.pipeline?.representation?.graph?.vertices ?? [];

const verticesById = {};
const verticesById: any = {};
vertices.forEach((vertex) => {
verticesById[vertex.id] = vertex;
vertex.stats = {};
Expand All @@ -82,7 +93,7 @@ export function _enrichStateWithStatsAggregation(

const verticesWithStatsBuckets =
statsAggregation.aggregations.pipelines.scoped.vertices.vertex_id.buckets;
verticesWithStatsBuckets.forEach((vertexStatsBucket) => {
verticesWithStatsBuckets.forEach((vertexStatsBucket: any) => {
// Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval
const vertexId = vertexStatsBucket.key;
const vertex = verticesById[vertexId];
Expand All @@ -98,13 +109,20 @@ export function _enrichStateWithStatsAggregation(
}
});

return stateDocument.logstash_state.pipeline;
return stateDocument.logstash_state?.pipeline;
}

export async function getPipeline(req, config, lsIndexPattern, clusterUuid, pipelineId, version) {
export async function getPipeline(
req: LegacyRequest,
config: { get: (key: string) => string | undefined },
lsIndexPattern: string,
clusterUuid: string,
pipelineId: string,
version: { firstSeen: string; lastSeen: string; hash: string }
) {
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');

const options = {
const options: any = {
clusterUuid,
pipelineId,
version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@
* you may not use this file except in compliance with the Elastic License.
*/

// @ts-ignore
import { createQuery } from '../create_query';
// @ts-ignore
import { LogstashMetric } from '../metrics';
import { get } from 'lodash';
import { LegacyRequest, ElasticsearchResponse } from '../../types';

export async function getPipelineStateDocument(
req,
logstashIndexPattern,
{ clusterUuid, pipelineId, version }
req: LegacyRequest,
logstashIndexPattern: string,
{
clusterUuid,
pipelineId,
version,
}: { clusterUuid: string; pipelineId: string; version: { hash: string } }
) {
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const { callWithRequest } = req.server.plugins?.elasticsearch.getCluster('monitoring');
const filters = [
{ term: { 'logstash_state.pipeline.id': pipelineId } },
{ term: { 'logstash_state.pipeline.hash': version.hash } },
Expand Down Expand Up @@ -43,8 +49,8 @@ export async function getPipelineStateDocument(
},
};

const resp = await callWithRequest(req, 'search', params);
const resp = (await callWithRequest(req, 'search', params)) as ElasticsearchResponse;

// Return null if doc not found
return get(resp, 'hits.hits[0]._source', null);
return resp.hits?.hits[0]?._source ?? null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@

import boom from '@hapi/boom';
import { get } from 'lodash';
// @ts-ignore
import { checkParam } from '../error_missing_required';
import { getPipelineStateDocument } from './get_pipeline_state_document';
// @ts-ignore
import { getPipelineVertexStatsAggregation } from './get_pipeline_vertex_stats_aggregation';
// @ts-ignore
import { calculateTimeseriesInterval } from '../calculate_timeseries_interval';
import { LegacyRequest } from '../../types';
import {
ElasticsearchSource,
ElasticsearchSourceLogstashPipelineVertex,
} from '../../../common/types/es';

export function _vertexStats(
vertex,
vertexStatsBucket,
totalProcessorsDurationInMillis,
timeseriesIntervalInSeconds
vertex: ElasticsearchSourceLogstashPipelineVertex,
vertexStatsBucket: any,
totalProcessorsDurationInMillis: number,
timeseriesIntervalInSeconds: number
) {
const isInput = vertex.plugin_type === 'input';
const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output';
Expand All @@ -27,9 +35,12 @@ export function _vertexStats(

const durationInMillis = vertexStatsBucket.duration_in_millis_total.value;

const inputStats = {};
const processorStats = {};
const eventsProcessedStats = {
const inputStats: any = {};
const processorStats: any = {};
const eventsProcessedStats: {
events_out_per_millisecond: number;
events_in_per_millisecond?: number;
} = {
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis,
};

Expand Down Expand Up @@ -72,53 +83,59 @@ export function _vertexStats(
* @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds
*/
export function _enrichVertexStateWithStatsAggregation(
stateDocument,
vertexStatsAggregation,
vertexId,
timeseriesIntervalInSeconds
stateDocument: ElasticsearchSource,
vertexStatsAggregation: any,
vertexId: string,
timeseriesIntervalInSeconds: number
) {
const logstashState = stateDocument.logstash_state;
const vertices = logstashState.pipeline.representation.graph.vertices;
const vertices = logstashState?.pipeline?.representation?.graph?.vertices;

// First, filter out the vertex we care about
const vertex = vertices.find((v) => v.id === vertexId);
vertex.stats = {};
const vertex = vertices?.find((v) => v.id === vertexId);
if (vertex) {
vertex.stats = {};
}

// Next, iterate over timeseries metrics and attach them to vertex
const timeSeriesBuckets = vertexStatsAggregation.aggregations.timeseries.buckets;
timeSeriesBuckets.forEach((timeSeriesBucket) => {
timeSeriesBuckets.forEach((timeSeriesBucket: any) => {
// each bucket calculates stats for total pipeline CPU time for the associated timeseries
const totalDurationStats = timeSeriesBucket.pipelines.scoped.total_processor_duration_stats;
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;

const timestamp = timeSeriesBucket.key;

const vertexStatsBucket = timeSeriesBucket.pipelines.scoped.vertices.vertex_id;
const vertexStats = _vertexStats(
vertex,
vertexStatsBucket,
totalProcessorsDurationInMillis,
timeseriesIntervalInSeconds
);
Object.keys(vertexStats).forEach((stat) => {
if (!vertex.stats.hasOwnProperty(stat)) {
vertex.stats[stat] = { data: [] };
}
vertex.stats[stat].data.push([timestamp, vertexStats[stat]]);
});
if (vertex) {
const vertexStats = _vertexStats(
vertex,
vertexStatsBucket,
totalProcessorsDurationInMillis,
timeseriesIntervalInSeconds
);
Object.keys(vertexStats).forEach((stat) => {
if (vertex?.stats) {
if (!vertex.stats.hasOwnProperty(stat)) {
vertex.stats[stat] = { data: [] };
}
vertex.stats[stat].data?.push([timestamp, vertexStats[stat]]);
}
});
}
});

return vertex;
}

export async function getPipelineVertex(
req,
config,
lsIndexPattern,
clusterUuid,
pipelineId,
version,
vertexId
req: LegacyRequest,
config: { get: (key: string) => string | undefined },
lsIndexPattern: string,
clusterUuid: string,
pipelineId: string,
version: { hash: string; firstSeen: string; lastSeen: string },
vertexId: string
) {
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');

Expand Down

0 comments on commit 2084f2d

Please sign in to comment.