Skip to content

Commit

Permalink
Cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
zbjornson committed Aug 1, 2017
1 parent 8cf0bd9 commit add8f4b
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"plugins": ["prettier"],
"extends": "eslint:recommended",
"env": {
"node": true
"node": true,
"es6": true
},
"parserOptions": {
"ecmaVersion": 2015
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ A prometheus client for node.js that supports histogram, summaries, gauges and c

See example folder for a sample usage. The library does not bundle any web framework, to expose the metrics just return the metrics() function in the registry.

#### Usage with Node.js's `cluster` module

Node.js's `cluster` module spawns multiple processes and hands off socket connections to those workers. Returning metrics from a worker's local registry will only reveal that worker's metrics, which is generally undesirable.

To solve this, you can aggregate all of the workers' metrics in the master process. See example folder for a sample usage. By default, custom metrics are summed across workers, and default metrics use sensible defaults. To use a different aggregation method, define an `aggregate()` function for the metric's name in the global registry in the *master* process. (See the `AggregatorFactory` in `lib/cluster.js` for details.)

Note: You must use the default global registry in cluster mode.

### API

#### Configuration
Expand Down
29 changes: 29 additions & 0 deletions example/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const cluster = require('cluster');
const express = require('express');
const metricsServer = express();
const promCluster = require('../lib/cluster');

if (cluster.isMaster) {
for (let i = 0; i < 4; i++) {
cluster.fork();
}

metricsServer.get('/cluster_metrics', (req, res) => {
promCluster.clusterMetrics((err, metrics) => {
//eslint-disable-next-line no-console
if (err) console.log(err);
res.set('Content-Type', promCluster.contentType);
res.send(metrics);
});
});

metricsServer.listen(3001);
//eslint-disable-next-line no-console
console.log(
'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics'
);
} else {
require('./server.js');
}
8 changes: 8 additions & 0 deletions example/server.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const express = require('express');
const cluster = require('cluster');
const server = express();
const register = require('../').register;

Expand Down Expand Up @@ -48,6 +49,13 @@ setInterval(() => {
g.labels('post', '300').inc();
}, 100);

if (cluster.isWorker) {
// Expose some worker-specific metric as an example
setInterval(() => {
c.inc({ code: `worker_${cluster.worker.id}` });
}, 2000);
}

server.get('/metrics', (req, res) => {
res.set('Content-Type', register.contentType);
res.end(register.metrics());
Expand Down
211 changes: 211 additions & 0 deletions lib/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
'use strict';

/**
* In cluster masters, extends Registry class with a `clusterMetrics` method,
* which returns aggregated metrics for all workers.
*
* In cluster workers, listens for and responds to requests for metrics by the
* cluster master.
*/

const cluster = require('cluster');
const Registry = require('./registry');
const util = require('./util');

const GET_METRICS_REQ = 'prom-client:getMetricsReq';
const GET_METRICS_RES = 'prom-client:getMetricsRes';

if (cluster.isMaster) {
let requestCtr = 0; // Concurrency control
const requests = new Map();

// Listener for worker responses to requests for local metrics
cluster.on('message', (worker, message) => {
if (arguments.length === 2) {
// pre-Node.js v6.0
message = worker;
worker = undefined;
}

if (message.type === GET_METRICS_RES) {
const request = requests.get(message.requestId);
request.responses.push(message.metrics);
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
const aggregatedRegistry = Registry.aggregate(request.responses);
const promString = aggregatedRegistry.metrics();
request.callback(null, promString);
}
}
});

/**
* Returns a new function that applies the `aggregatorFn` to the values.
* @param {Function} aggregatorFn function to apply to values.
* @return {Function} aggregator function
*/
const AggregatorFactory = function(aggregatorFn) {
return metrics => {
if (metrics.length === 0) return;
const result = {
help: metrics[0].help,
name: metrics[0].name,
type: metrics[0].type,
values: []
};
const byLabels = new Map();
metrics.forEach(metric => {
metric.values.forEach(value => {
const key = util.hashObject(value.labels);
if (byLabels.has(key)) {
byLabels.get(key).push(value);
} else {
byLabels.set(key, [value]);
}
});
});
byLabels.forEach(values => {
if (values.length === 0) return;
const value = aggregatorFn(values);
const valObj = {
value,
labels: values[0].labels
};
if (values[0].metricName) {
valObj.metricName = values[0].metricName;
}
// TODO how do we aggregate timestamps? Average? Omit?
result.values.push(valObj);
});
return result;
};
};

const SUM = AggregatorFactory(v => v.reduce((p, c) => p + c.value, 0));
const FIRST = AggregatorFactory(v => v[0].value);
const VOID = () => {};
const AVERAGE = AggregatorFactory(
v => v.reduce((p, c) => p + c.value, 0) / v.length
);
// const MIN = AggregatorFactory(v => v.reduce((p, c) => Math.min(p + c.value), 0));
// const MAX = AggregatorFactory(v => v.reduce((p, c) => Math.max(p + c.value), 0));

// Define merge strategies for all default metrics. User metrics may
// define an `aggregate` method, or will be summed by default.
// TODO These are here for the sake of isolation, but obviously would be
// nicer if they were defined in the metrics/ files.
// TODO Might be nice to have multiple aggregates for some, like the
// eventloop lag, where you might want MIN, MAX and AVERAGE.
const metricMergeFns = {
nodejs_active_handles_total: SUM,
nodejs_eventloop_lag_seconds: AVERAGE,
nodejs_heap_size_total_bytes: SUM,
nodejs_heap_size_used_bytes: SUM,
nodejs_external_memory_bytes: SUM,
nodejs_heap_space_size_total: SUM,
nodejs_heap_space_size_used: SUM,
nodejs_heap_space_size_available: SUM,
process_cpu_user_seconds_total: SUM,
process_cpu_system_seconds_total: SUM,
process_cpu_seconds_total: SUM,
process_max_fds: SUM,
process_open_fds: SUM,
nodejs_active_requests_total: SUM,
process_start_time_seconds: VOID, // don't think there's a sensible aggregate
nodejs_version_info: FIRST,
process_resident_memory_bytes: SUM,
process_virtual_memory_bytes: SUM,
process_heap_bytes: SUM
};
for (const metricName in metricMergeFns) {
const metric = Registry.globalRegistry.getSingleMetric(metricName);
// Not always defined: some are platform-specific.
if (metric) metric.aggregate = metricMergeFns[metricName];
}

/**
* Gets aggregated metrics for all workers.
* @param {Function} callback (err, metrics) => any
* @return {undefined} undefined
*/
Registry.prototype.clusterMetrics = function(callback) {
const requestId = requestCtr++;

requests.set(requestId, {
responses: [],
pending: Object.keys(cluster.workers).length,
callback
});

const req = {
type: GET_METRICS_REQ,
requestId
};
for (const id in cluster.workers) cluster.workers[id].send(req);

// TODO set a timeout for worker responses.
};

/**
* Creates a new Registry instance from an array of metrics that were
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
* their `aggregate` method, or by summation if that is undefined.
* @param {Array} metricsArr Array of metrics, each of which created by
* `registry.getMetricsAsJSON()`.
* @return {Registry} aggregated registry.
*/
Registry.aggregate = function(metricsArr) {
const aggregatedRegistry = new Registry();
const metricsByName = new Map();

// Gather by name
metricsArr.forEach(metrics => {
metrics.forEach(metric => {
if (metricsByName.has(metric.name)) {
metricsByName.get(metric.name).push(metric);
} else {
metricsByName.set(metric.name, [metric]);
}
});
});

// Aggregate gathered metrics. Default to summation.
metricsByName.forEach((metrics, metricName) => {
const metric = Registry.globalRegistry.getSingleMetric(metricName);
let aggregatedMetric;
if (metric && metric.aggregate) {
aggregatedMetric = metric.aggregate(metrics);
} else {
const aggregate = metricMergeFns[metricName] || SUM;
aggregatedMetric = aggregate(metrics);
}
if (aggregatedMetric) {
// VOID aggregator returns undefined
aggregatedMetric.get = () => aggregatedMetric;
aggregatedRegistry.registerMetric(aggregatedMetric);
}
});

return aggregatedRegistry;
};
}

if (cluster.isWorker) {
// Respond to master's requests for worker's local metrics.
process.on('message', message => {
if (message.type === GET_METRICS_REQ) {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
metrics: Registry.globalRegistry.getMetricsAsJSON()
});
}
});
}

// else not a clustered server

module.exports = Registry.globalRegistry;

0 comments on commit add8f4b

Please sign in to comment.