Skip to content

Commit

Permalink
[Monitoring] Support shipping directly to the monitoring cluster (#57022
Browse files Browse the repository at this point in the history
) (#57734)

* Support shipping directly to the monitoring cluster

* Add timestamp

* PR feedback

* Use utc
  • Loading branch information
chrisronline authored Feb 14, 2020
1 parent 133b1ed commit 63aa613
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 18 deletions.
2 changes: 1 addition & 1 deletion x-pack/legacy/plugins/monitoring/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const KIBANA_MONITORING_LOGGING_TAG = 'kibana-monitoring';
* The Monitoring API version is the expected API format that we export and expect to import.
* @type {string}
*/
export const MONITORING_SYSTEM_API_VERSION = '6';
export const MONITORING_SYSTEM_API_VERSION = '7';
/**
* The type name used within the Monitoring index to publish Kibana ops stats.
* @type {string}
Expand Down
41 changes: 41 additions & 0 deletions x-pack/legacy/plugins/monitoring/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,47 @@ export const config = Joi => {
interval: Joi.number().default(10000), // op status metrics get buffered at `ops.interval` and flushed to the bulk endpoint at this interval
}).default(),
}).default(),
elasticsearch: Joi.object({
customHeaders: Joi.object().default({}),
logQueries: Joi.boolean().default(false),
requestHeadersWhitelist: Joi.array()
.items()
.single()
.default(DEFAULT_REQUEST_HEADERS),
sniffOnStart: Joi.boolean().default(false),
sniffInterval: Joi.number()
.allow(false)
.default(false),
sniffOnConnectionFault: Joi.boolean().default(false),
hosts: Joi.array()
.items(Joi.string().uri({ scheme: ['http', 'https'] }))
.single(), // if empty, use Kibana's connection config
username: Joi.string(),
password: Joi.string(),
requestTimeout: Joi.number().default(30000),
pingTimeout: Joi.number().default(30000),
ssl: Joi.object({
verificationMode: Joi.string()
.valid('none', 'certificate', 'full')
.default('full'),
certificateAuthorities: Joi.array()
.single()
.items(Joi.string()),
certificate: Joi.string(),
key: Joi.string(),
keyPassphrase: Joi.string(),
keystore: Joi.object({
path: Joi.string(),
password: Joi.string(),
}).default(),
truststore: Joi.object({
path: Joi.string(),
password: Joi.string(),
}).default(),
alwaysPresentCertificate: Joi.boolean().default(false),
}).default(),
apiVersion: Joi.string().default('master'),
}).default(),
cluster_alerts: Joi.object({
enabled: Joi.boolean().default(true),
email_notifications: Joi.object({
Expand Down
7 changes: 5 additions & 2 deletions x-pack/legacy/plugins/monitoring/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const validConfigOptions: string[] = [
'monitoring.ui.container.logstash.enabled',
'monitoring.tests.cloud_detector.enabled',
'monitoring.kibana.collection.interval',
'monitoring.elasticsearch.hosts',
'monitoring.elasticsearch',
'monitoring.ui.elasticsearch.hosts',
'monitoring.ui.elasticsearch',
'monitoring.xpack_api_polling_frequency_millis',
Expand Down Expand Up @@ -78,7 +80,7 @@ export const monitoring = (kibana: LegacyPluginApi): LegacyPluginSpec => {
uiExports: getUiExports(),
deprecations,

init(server: Server) {
async init(server: Server) {
const serverConfig = server.config();
const { getOSInfo, plugins, injectUiAppVars } = server as typeof server & { getOSInfo?: any };
const log = (...args: Parameters<typeof server.log>) => server.log(...args);
Expand Down Expand Up @@ -128,7 +130,8 @@ export const monitoring = (kibana: LegacyPluginApi): LegacyPluginSpec => {
},
};

new Plugin().setup(coreSetup, pluginsSetup, __LEGACY);
const plugin = new Plugin();
await plugin.setup(coreSetup, pluginsSetup, __LEGACY);
},

postInit(server: Server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ const KEY = 'monitoring.ui.elasticsearch';
* TODO: this code can be removed when this plugin is migrated to the Kibana Platform,
* at that point the ElasticsearchClient and ElasticsearchConfig should be used instead
*/
export const parseElasticsearchConfig = (config: any) => {
const es = config.get(KEY);
export const parseElasticsearchConfig = (config: any, configKey: string = KEY) => {
const es = config.get(configKey);
if (!es) {
return {};
}

const errorPrefix = `[config validation of [${KEY}].ssl]`;
const errorPrefix = `[config validation of [${configKey}].ssl]`;
if (es.ssl?.key && es.ssl?.keystore?.path) {
throw new Error(`${errorPrefix}: cannot use [key] when [keystore.path] is specified`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

import { noop } from 'lodash';
import sinon from 'sinon';
import moment from 'moment';
import expect from '@kbn/expect';
import { BulkUploader } from '../bulk_uploader';
import { MONITORING_SYSTEM_API_VERSION } from '../../../common/constants';

const FETCH_INTERVAL = 300;
const CHECK_DELAY = 500;
Expand Down Expand Up @@ -52,6 +54,9 @@ describe('BulkUploader', () => {

server = {
log: sinon.spy(),
config: {
get: sinon.spy(),
},
elasticsearchPlugin: {
createCluster: () => cluster,
getCluster: () => cluster,
Expand Down Expand Up @@ -307,5 +312,92 @@ describe('BulkUploader', () => {
done();
}, CHECK_DELAY);
});

it('uses a direct connection to the monitoring cluster, when configured', done => {
const dateInIndex = '2020.02.10';
const oldNow = moment.now;
moment.now = () => 1581310800000;
const prodClusterUuid = '1sdfd5';
const prodCluster = {
callWithInternalUser: sinon
.stub()
.withArgs('monitoring.bulk')
.callsFake(arg => {
let resolution = null;
if (arg === 'info') {
resolution = { cluster_uuid: prodClusterUuid };
}
return new Promise(resolve => resolve(resolution));
}),
};
const monitoringCluster = {
callWithInternalUser: sinon
.stub()
.withArgs('bulk')
.callsFake(() => {
return new Promise(resolve => setTimeout(resolve, CHECK_DELAY + 1));
}),
};

const collectorFetch = sinon.stub().returns({
type: 'kibana_stats',
result: { type: 'kibana_stats', payload: { testData: 12345 } },
});

const collectors = new MockCollectorSet(server, [
{
fetch: collectorFetch,
isReady: () => true,
formatForBulkUpload: result => result,
isUsageCollector: false,
},
]);
const customServer = {
...server,
elasticsearchPlugin: {
createCluster: () => monitoringCluster,
getCluster: name => {
if (name === 'admin' || name === 'data') {
return prodCluster;
}
return monitoringCluster;
},
},
config: {
get: key => {
if (key === 'monitoring.elasticsearch') {
return {
hosts: ['http://localhost:9200'],
username: 'tester',
password: 'testing',
ssl: {},
};
}
return null;
},
},
};
const kbnServerStatus = { toJSON: () => ({ overall: { state: 'green' } }) };
const kbnServerVersion = 'master';
const uploader = new BulkUploader({
...customServer,
interval: FETCH_INTERVAL,
kbnServerStatus,
kbnServerVersion,
});
uploader.start(collectors);
setTimeout(() => {
uploader.stop();
const firstCallArgs = monitoringCluster.callWithInternalUser.firstCall.args;
expect(firstCallArgs[0]).to.be('bulk');
expect(firstCallArgs[1].body[0].index._index).to.be(
`.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateInIndex}`
);
expect(firstCallArgs[1].body[1].type).to.be('kibana_stats');
expect(firstCallArgs[1].body[1].cluster_uuid).to.be(prodClusterUuid);
moment.now = oldNow;
done();
}, CHECK_DELAY);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { defaultsDeep, uniq, compact } from 'lodash';
import { defaultsDeep, uniq, compact, get } from 'lodash';
import { callClusterFactory } from '../../../xpack_main';

import {
Expand All @@ -14,6 +14,8 @@ import {
} from '../../common/constants';

import { sendBulkPayload, monitoringBulk, getKibanaInfoForStats } from './lib';
import { parseElasticsearchConfig } from '../es_client/parse_elasticsearch_config';
import { hasMonitoringCluster } from '../es_client/instantiate_client';

const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];

Expand All @@ -39,6 +41,8 @@ export class BulkUploader {
throw new Error('interval number of milliseconds is required');
}

this._hasDirectConnectionToMonitoringCluster = false;
this._productionClusterUuid = null;
this._timer = null;
// Hold sending and fetching usage until monitoring.bulk is successful. This means that we
// send usage data on the second tick. But would save a lot of bandwidth fetching usage on
Expand All @@ -60,6 +64,19 @@ export class BulkUploader {
plugins: [monitoringBulk],
});

const directConfig = parseElasticsearchConfig(config, 'monitoring.elasticsearch');
if (hasMonitoringCluster(directConfig)) {
this._log.info(`Detected direct connection to monitoring cluster`);
this._hasDirectConnectionToMonitoringCluster = true;
this._cluster = elasticsearchPlugin.createCluster('monitoring-direct', directConfig);
elasticsearchPlugin
.getCluster('admin')
.callWithInternalUser('info')
.then(data => {
this._productionClusterUuid = get(data, 'cluster_uuid');
});
}

this._callClusterWithInternalUser = callClusterFactory({
plugins: { elasticsearch: elasticsearchPlugin },
}).getCallClusterInternal();
Expand Down Expand Up @@ -151,7 +168,6 @@ export class BulkUploader {

const data = await usageCollection.bulkFetch(this._callClusterWithInternalUser);
const payload = this.toBulkUploadFormat(compact(data), usageCollection);

if (payload) {
try {
this._log.debug(`Uploading bulk stats payload to the local cluster`);
Expand Down Expand Up @@ -182,7 +198,14 @@ export class BulkUploader {
}

async _onPayload(payload) {
return await sendBulkPayload(this._cluster, this._interval, payload);
return await sendBulkPayload(
this._cluster,
this._interval,
payload,
this._log,
this._hasDirectConnectionToMonitoringCluster,
this._productionClusterUuid
);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,64 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { chunk, get } from 'lodash';
import {
MONITORING_SYSTEM_API_VERSION,
KIBANA_SYSTEM_ID,
KIBANA_STATS_TYPE_MONITORING,
KIBANA_SETTINGS_TYPE,
} from '../../../common/constants';

import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants';
const SUPPORTED_TYPES = [KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE];
export function formatForNormalBulkEndpoint(payload, productionClusterUuid) {
const dateSuffix = moment.utc().format('YYYY.MM.DD');
return chunk(payload, 2).reduce((accum, chunk) => {
const type = get(chunk[0], 'index._type');
if (!type || !SUPPORTED_TYPES.includes(type)) {
return accum;
}

const { timestamp } = chunk[1];

accum.push({
index: {
_index: `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateSuffix}`,
},
});
accum.push({
[type]: chunk[1],
type,
timestamp,
cluster_uuid: productionClusterUuid,
});
return accum;
}, []);
}

/*
* Send the Kibana usage data to the ES Monitoring Bulk endpoint
*/
export function sendBulkPayload(cluster, interval, payload) {
export async function sendBulkPayload(
cluster,
interval,
payload,
log,
hasDirectConnectionToMonitoringCluster = false,
productionClusterUuid = null
) {
if (hasDirectConnectionToMonitoringCluster) {
if (productionClusterUuid === null) {
log.warn(
`Unable to determine production cluster uuid to use for shipping monitoring data. Kibana monitoring data will appear in a standalone cluster in the Stack Monitoring UI.`
);
}
const formattedPayload = formatForNormalBulkEndpoint(payload, productionClusterUuid);
return await cluster.callWithInternalUser('bulk', {
body: formattedPayload,
});
}

return cluster.callWithInternalUser('monitoring.bulk', {
system_id: KIBANA_SYSTEM_ID,
system_api_version: MONITORING_SYSTEM_API_VERSION,
Expand Down
16 changes: 9 additions & 7 deletions x-pack/legacy/plugins/monitoring/server/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { getLicenseExpiration } from './alerts/license_expiration';
import { parseElasticsearchConfig } from './es_client/parse_elasticsearch_config';

export class Plugin {
setup(_coreSetup, pluginsSetup, __LEGACY) {
async setup(_coreSetup, pluginsSetup, __LEGACY) {
const {
plugins,
_kbnServer: kbnServer,
Expand Down Expand Up @@ -59,6 +59,14 @@ export class Plugin {
*/
const elasticsearchConfig = parseElasticsearchConfig(config);

// Create the dedicated client
await instantiateClient({
log,
events,
elasticsearchConfig,
elasticsearchPlugin: plugins.elasticsearch,
});

xpackMainPlugin.status.once('green', async () => {
// first time xpack_main turns green
/*
Expand All @@ -67,12 +75,6 @@ export class Plugin {
const uiEnabled = config.get('monitoring.ui.enabled');

if (uiEnabled) {
await instantiateClient({
log,
events,
elasticsearchConfig,
elasticsearchPlugin: plugins.elasticsearch,
}); // Instantiate the dedicated ES client
await initMonitoringXpackInfo({
config,
log,
Expand Down

0 comments on commit 63aa613

Please sign in to comment.