From add8f4baaf13087026469b8a813744041a5a03f7 Mon Sep 17 00:00:00 2001 From: Zach Bjornson Date: Sun, 30 Jul 2017 00:36:46 -0700 Subject: [PATCH] Cluster support --- .eslintrc | 3 +- README.md | 8 ++ example/cluster.js | 29 +++++++ example/server.js | 8 ++ lib/cluster.js | 211 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 example/cluster.js create mode 100644 lib/cluster.js diff --git a/.eslintrc b/.eslintrc index 76804890..763dd58a 100644 --- a/.eslintrc +++ b/.eslintrc @@ -2,7 +2,8 @@ "plugins": ["prettier"], "extends": "eslint:recommended", "env": { - "node": true + "node": true, + "es6": true }, "parserOptions": { "ecmaVersion": 2015 diff --git a/README.md b/README.md index b729d92a..60072d56 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,14 @@ A prometheus client for node.js that supports histogram, summaries, gauges and c See example folder for a sample usage. The library does not bundle any web framework, to expose the metrics just return the metrics() function in the registry. +#### Usage with Node.js's `cluster` module + +Node.js's `cluster` module spawns multiple processes and hands off socket connections to those workers. Returning metrics from a worker's local registry will only reveal that worker's metrics, which is generally undesirable. + +To solve this, you can aggregate all of the workers' metrics in the master process. See example folder for a sample usage. By default, custom metrics are summed across workers, and default metrics use sensible defaults. To use a different aggregation method, define an `aggregate()` function for the metric's name in the global registry in the *master* process. (See the `AggregatorFactory` in `lib/cluster.js` for details.) + +Note: You must use the default global registry in cluster mode. + ### API #### Configuration diff --git a/example/cluster.js b/example/cluster.js new file mode 100644 index 00000000..66c50794 --- /dev/null +++ b/example/cluster.js @@ -0,0 +1,29 @@ +'use strict'; + +const cluster = require('cluster'); +const express = require('express'); +const metricsServer = express(); +const promCluster = require('../lib/cluster'); + +if (cluster.isMaster) { + for (let i = 0; i < 4; i++) { + cluster.fork(); + } + + metricsServer.get('/cluster_metrics', (req, res) => { + promCluster.clusterMetrics((err, metrics) => { + //eslint-disable-next-line no-console + if (err) console.log(err); + res.set('Content-Type', promCluster.contentType); + res.send(metrics); + }); + }); + + metricsServer.listen(3001); + //eslint-disable-next-line no-console + console.log( + 'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics' + ); +} else { + require('./server.js'); +} diff --git a/example/server.js b/example/server.js index 65e9aac3..19aa9c14 100644 --- a/example/server.js +++ b/example/server.js @@ -1,6 +1,7 @@ 'use strict'; const express = require('express'); +const cluster = require('cluster'); const server = express(); const register = require('../').register; @@ -48,6 +49,13 @@ setInterval(() => { g.labels('post', '300').inc(); }, 100); +if (cluster.isWorker) { + // Expose some worker-specific metric as an example + setInterval(() => { + c.inc({ code: `worker_${cluster.worker.id}` }); + }, 2000); +} + server.get('/metrics', (req, res) => { res.set('Content-Type', register.contentType); res.end(register.metrics()); diff --git a/lib/cluster.js b/lib/cluster.js new file mode 100644 index 00000000..4d9f2a8d --- /dev/null +++ b/lib/cluster.js @@ -0,0 +1,211 @@ +'use strict'; + +/** + * In cluster masters, extends Registry class with a `clusterMetrics` method, + * which returns aggregated metrics for all workers. + * + * In cluster workers, listens for and responds to requests for metrics by the + * cluster master. + */ + +const cluster = require('cluster'); +const Registry = require('./registry'); +const util = require('./util'); + +const GET_METRICS_REQ = 'prom-client:getMetricsReq'; +const GET_METRICS_RES = 'prom-client:getMetricsRes'; + +if (cluster.isMaster) { + let requestCtr = 0; // Concurrency control + const requests = new Map(); + + // Listener for worker responses to requests for local metrics + cluster.on('message', (worker, message) => { + if (arguments.length === 2) { + // pre-Node.js v6.0 + message = worker; + worker = undefined; + } + + if (message.type === GET_METRICS_RES) { + const request = requests.get(message.requestId); + request.responses.push(message.metrics); + request.pending--; + + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + const aggregatedRegistry = Registry.aggregate(request.responses); + const promString = aggregatedRegistry.metrics(); + request.callback(null, promString); + } + } + }); + + /** + * Returns a new function that applies the `aggregatorFn` to the values. + * @param {Function} aggregatorFn function to apply to values. + * @return {Function} aggregator function + */ + const AggregatorFactory = function(aggregatorFn) { + return metrics => { + if (metrics.length === 0) return; + const result = { + help: metrics[0].help, + name: metrics[0].name, + type: metrics[0].type, + values: [] + }; + const byLabels = new Map(); + metrics.forEach(metric => { + metric.values.forEach(value => { + const key = util.hashObject(value.labels); + if (byLabels.has(key)) { + byLabels.get(key).push(value); + } else { + byLabels.set(key, [value]); + } + }); + }); + byLabels.forEach(values => { + if (values.length === 0) return; + const value = aggregatorFn(values); + const valObj = { + value, + labels: values[0].labels + }; + if (values[0].metricName) { + valObj.metricName = values[0].metricName; + } + // TODO how do we aggregate timestamps? Average? Omit? + result.values.push(valObj); + }); + return result; + }; + }; + + const SUM = AggregatorFactory(v => v.reduce((p, c) => p + c.value, 0)); + const FIRST = AggregatorFactory(v => v[0].value); + const VOID = () => {}; + const AVERAGE = AggregatorFactory( + v => v.reduce((p, c) => p + c.value, 0) / v.length + ); + // const MIN = AggregatorFactory(v => v.reduce((p, c) => Math.min(p + c.value), 0)); + // const MAX = AggregatorFactory(v => v.reduce((p, c) => Math.max(p + c.value), 0)); + + // Define merge strategies for all default metrics. User metrics may + // define an `aggregate` method, or will be summed by default. + // TODO These are here for the sake of isolation, but obviously would be + // nicer if they were defined in the metrics/ files. + // TODO Might be nice to have multiple aggregates for some, like the + // eventloop lag, where you might want MIN, MAX and AVERAGE. + const metricMergeFns = { + nodejs_active_handles_total: SUM, + nodejs_eventloop_lag_seconds: AVERAGE, + nodejs_heap_size_total_bytes: SUM, + nodejs_heap_size_used_bytes: SUM, + nodejs_external_memory_bytes: SUM, + nodejs_heap_space_size_total: SUM, + nodejs_heap_space_size_used: SUM, + nodejs_heap_space_size_available: SUM, + process_cpu_user_seconds_total: SUM, + process_cpu_system_seconds_total: SUM, + process_cpu_seconds_total: SUM, + process_max_fds: SUM, + process_open_fds: SUM, + nodejs_active_requests_total: SUM, + process_start_time_seconds: VOID, // don't think there's a sensible aggregate + nodejs_version_info: FIRST, + process_resident_memory_bytes: SUM, + process_virtual_memory_bytes: SUM, + process_heap_bytes: SUM + }; + for (const metricName in metricMergeFns) { + const metric = Registry.globalRegistry.getSingleMetric(metricName); + // Not always defined: some are platform-specific. + if (metric) metric.aggregate = metricMergeFns[metricName]; + } + + /** + * Gets aggregated metrics for all workers. + * @param {Function} callback (err, metrics) => any + * @return {undefined} undefined + */ + Registry.prototype.clusterMetrics = function(callback) { + const requestId = requestCtr++; + + requests.set(requestId, { + responses: [], + pending: Object.keys(cluster.workers).length, + callback + }); + + const req = { + type: GET_METRICS_REQ, + requestId + }; + for (const id in cluster.workers) cluster.workers[id].send(req); + + // TODO set a timeout for worker responses. + }; + + /** + * Creates a new Registry instance from an array of metrics that were + * created by `registry.getMetricsAsJSON()`. Metrics are aggregated using + * their `aggregate` method, or by summation if that is undefined. + * @param {Array} metricsArr Array of metrics, each of which created by + * `registry.getMetricsAsJSON()`. + * @return {Registry} aggregated registry. + */ + Registry.aggregate = function(metricsArr) { + const aggregatedRegistry = new Registry(); + const metricsByName = new Map(); + + // Gather by name + metricsArr.forEach(metrics => { + metrics.forEach(metric => { + if (metricsByName.has(metric.name)) { + metricsByName.get(metric.name).push(metric); + } else { + metricsByName.set(metric.name, [metric]); + } + }); + }); + + // Aggregate gathered metrics. Default to summation. + metricsByName.forEach((metrics, metricName) => { + const metric = Registry.globalRegistry.getSingleMetric(metricName); + let aggregatedMetric; + if (metric && metric.aggregate) { + aggregatedMetric = metric.aggregate(metrics); + } else { + const aggregate = metricMergeFns[metricName] || SUM; + aggregatedMetric = aggregate(metrics); + } + if (aggregatedMetric) { + // VOID aggregator returns undefined + aggregatedMetric.get = () => aggregatedMetric; + aggregatedRegistry.registerMetric(aggregatedMetric); + } + }); + + return aggregatedRegistry; + }; +} + +if (cluster.isWorker) { + // Respond to master's requests for worker's local metrics. + process.on('message', message => { + if (message.type === GET_METRICS_REQ) { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + metrics: Registry.globalRegistry.getMetricsAsJSON() + }); + } + }); +} + +// else not a clustered server + +module.exports = Registry.globalRegistry;