Skip to content

Commit

Permalink
WIP: support aggregating metrics in clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
zbjornson committed Aug 7, 2017
1 parent 8cf0bd9 commit 626ed92
Show file tree
Hide file tree
Showing 16 changed files with 603 additions and 7 deletions.
9 changes: 8 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"plugins": ["prettier"],
"extends": "eslint:recommended",
"env": {
"node": true
"node": true,
"es6": true
},
"parserOptions": {
"ecmaVersion": 2015
Expand Down Expand Up @@ -55,6 +56,12 @@
"no-shadow": "off",
"no-unused-expressions": "off"
}
},
{
"files": ["example/**/*.js"],
"rules": {
"no-console": "off"
}
}
]
}
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@ A prometheus client for node.js that supports histogram, summaries, gauges and c

### Usage

See example folder for a sample usage. The library does not bundle any web framework, to expose the metrics just return the metrics() function in the registry.
See example folder for a sample usage. The library does not bundle any web framework, to expose the metrics just return the `metrics()` function in the registry.

#### Usage with Node.js's `cluster` module

Node.js's `cluster` module spawns multiple processes and hands off socket connections to those workers. Returning metrics from a worker's local registry will only reveal that individual worker's metrics, which is generally undesirable. To solve this, you can aggregate all of the workers' metrics in the master process. See example folder for a sample usage.

Default metrics use sensible aggregation methods.

Custom metrics are summed across workers by default. To use a different aggregation method, define an `aggregator` property in the metric config. (See `lib/metrics/version.js` for an example.) When using a custom `aggregator`, the metric *must* be registered with the global registry in both the master and worker processes.

Note: You must use the default global registry in cluster mode.

### API

Expand Down Expand Up @@ -248,7 +258,7 @@ You can prevent this by setting last parameter when creating the metric to `fals
Using non-global registries requires creating Registry instance and adding it inside `registers` inside the configuration object. Alternatively
you can pass an empty `registers` array and register it manually.

Registry has a merge function that enables you to expose multiple registries on the same endpoint. If the same metric name exists in both registries, an error will be thrown.
Registry has a `merge` function that enables you to expose multiple registries on the same endpoint. If the same metric name exists in both registries, an error will be thrown.

```js
const client = require('prom-client');
Expand Down Expand Up @@ -278,6 +288,10 @@ If you need to get a reference to a previously registered metric, you can use `r
You can remove all metrics by calling `register.clear()`. You can also remove a single metric by calling
`register.removeSingleMetric(*name of metric*)`.

##### Cluster metrics

You can get aggregated metrics for all worker processes in a cluster by running `register.clusterMetrics((err, metrics) => {...})`, which will be called with an error, or with the aggregated metrics string for prometheus to consume.

#### Pushgateway

It is possible to push metrics via a [Pushgateway](https://github.com/prometheus/pushgateway).
Expand Down
27 changes: 27 additions & 0 deletions example/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

const cluster = require('cluster');
const express = require('express');
const metricsServer = express();
const AggregatorRegistry = require('../').AggregatorRegistry;

if (cluster.isMaster) {
for (let i = 0; i < 4; i++) {
cluster.fork();
}

metricsServer.get('/cluster_metrics', (req, res) => {
AggregatorRegistry.clusterMetrics((err, metrics) => {
if (err) console.log(err);
res.set('Content-Type', AggregatorRegistry.contentType);
res.send(metrics);
});
});

metricsServer.listen(3001);
console.log(
'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics'
);
} else {
require('./server.js');
}
9 changes: 8 additions & 1 deletion example/server.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const express = require('express');
const cluster = require('cluster');
const server = express();
const register = require('../').register;

Expand Down Expand Up @@ -48,6 +49,13 @@ setInterval(() => {
g.labels('post', '300').inc();
}, 100);

if (cluster.isWorker) {
// Expose some worker-specific metric as an example
setInterval(() => {
c.inc({ code: `worker_${cluster.worker.id}` });
}, 2000);
}

server.get('/metrics', (req, res) => {
res.set('Content-Type', register.contentType);
res.end(register.metrics());
Expand All @@ -61,6 +69,5 @@ server.get('/metrics/counter', (req, res) => {
//Enable collection of default metrics
require('../').collectDefaultMetrics();

//eslint-disable-next-line no-console
console.log('Server listening to 3000, metrics exposed on /metrics endpoint');
server.listen(3000);
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ exports.linearBuckets = require('./lib/bucketGenerators').linearBuckets;
exports.exponentialBuckets = require('./lib/bucketGenerators').exponentialBuckets;

exports.collectDefaultMetrics = require('./lib/defaultMetrics');

exports.AGGREGATORS = require('./lib/metricAggregators').AGGREGATORS;
exports.AggregatorRegistry = require('./lib/cluster');
151 changes: 151 additions & 0 deletions lib/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
'use strict';

/**
* In cluster masters, extends Registry class with a `clusterMetrics` method,
* which returns aggregated metrics for all workers.
*
* In cluster workers, listens for and responds to requests for metrics by the
* cluster master.
*/

const cluster = require('cluster');
const Registry = require('./registry');
const AGGREGATORS = require('./metricAggregators').AGGREGATORS;

const GET_METRICS_REQ = 'prom-client:getMetricsReq';
const GET_METRICS_RES = 'prom-client:getMetricsRes';

// Default metrics that don't use summation:
const DEFAULT_METRIC_AGGREGATORS = {
nodejs_version_info: AGGREGATORS.FIRST,
process_start_time_seconds: AGGREGATORS.OMIT,
nodejs_eventloop_lag_seconds: AGGREGATORS.AVERAGE
};

let requestCtr = 0; // Concurrency control
const requests = new Map(); // Pending requests for workers' local metrics.

class AggregatorRegistry extends Registry {
/**
* Gets aggregated metrics for all workers.
* @param {Function} callback (err, metrics) => any
* @return {undefined} undefined
*/
clusterMetrics(callback) {
const requestId = requestCtr++;

const request = {
responses: [],
pending: Object.keys(cluster.workers).length,
callback,
errorTimeout: setTimeout(() => {
request.failed = true;
request.callback(new Error('Operation timed out.'));
}, 5000),
failed: false
};
requests.set(requestId, request);

const message = {
type: GET_METRICS_REQ,
requestId
};
for (const id in cluster.workers) cluster.workers[id].send(message);
}

/**
* Creates a new Registry instance from an array of metrics that were
* created by `registry.getMetricsAsJSON()`. Metrics registered on the
* globalRegistry are aggregated using their `aggregate` method if defined,
* or by summation if not registered or missing an `aggregate` method.
* @param {Array} metricsArr Array of metrics, each of which created by
* `registry.getMetricsAsJSON()`.
* @return {Registry} aggregated registry.
*/
static aggregate(metricsArr) {
const aggregatedRegistry = new Registry();
const metricsByName = new Map();

// Gather by name
metricsArr.forEach(metrics => {
metrics.forEach(metric => {
if (metricsByName.has(metric.name)) {
metricsByName.get(metric.name).push(metric);
} else {
metricsByName.set(metric.name, [metric]);
}
});
});

// Aggregate gathered metrics. Default to summation.
metricsByName.forEach((metrics, metricName) => {
let aggregatedMetric;
const metric = Registry.globalRegistry.getSingleMetric(metricName);
const defaultMetricAggregator = DEFAULT_METRIC_AGGREGATORS[metricName];
if (metric && metric.aggregate) {
// Metric is in global registry and has aggregator fn
aggregatedMetric = metric.aggregate(metrics);
} else if (defaultMetricAggregator) {
// Metric is not in global registry or has no aggregator fn,
// but is a default metric
aggregatedMetric = defaultMetricAggregator(metrics);
} else {
aggregatedMetric = AGGREGATORS.SUM(metrics);
}
if (aggregatedMetric) {
// OMIT aggregator returns undefined
const aggregatedMetricWrapper = Object.assign(
{
get: () => aggregatedMetric
},
aggregatedMetric
);
aggregatedRegistry.registerMetric(aggregatedMetricWrapper);
}
});

return aggregatedRegistry;
}
}

module.exports = AggregatorRegistry;

if (cluster.isMaster) {
// Listen for worker responses to requests for local metrics
cluster.on('message', (worker, message) => {
if (arguments.length === 2) {
// pre-Node.js v6.0
message = worker;
worker = undefined;
}

if (message.type === GET_METRICS_RES) {
const request = requests.get(message.requestId);
request.responses.push(message.metrics);
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

if (request.failed) return; // Callback already run with Error.

const registry = Registry.aggregate(request.responses);
const promString = registry.metrics();
request.callback(null, promString);
}
}
});
} else if (cluster.isWorker) {
// Respond to master's requests for worker's local metrics.
process.on('message', message => {
if (message.type === GET_METRICS_REQ) {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
metrics: Registry.globalRegistry.getMetricsAsJSON()
});
}
});
}
2 changes: 2 additions & 0 deletions lib/counter.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const validateLabelNames = require('./validation').validateLabelName;
const isObject = require('./util').isObject;
const printDeprecationObjectConstructor = require('./util')
.printDeprecationObjectConstructor;
const AGGREGATORS = require('./metricAggregators').AGGREGATORS;

const getLabels = require('./util').getLabels;

Expand Down Expand Up @@ -72,6 +73,7 @@ class Counter {
}

this.help = config.help;
this.aggregate = config.aggregator || AGGREGATORS.SUM;

config.registers.forEach(registryInstance =>
registryInstance.registerMetric(this)
Expand Down
2 changes: 2 additions & 0 deletions lib/gauge.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const validateLabelNames = require('./validation').validateLabelName;
const isObject = require('./util').isObject;
const printDeprecationObjectConstructor = require('./util')
.printDeprecationObjectConstructor;
const AGGREGATORS = require('./metricAggregators').AGGREGATORS;

class Gauge {
/**
Expand Down Expand Up @@ -69,6 +70,7 @@ class Gauge {
this.hashMap = createValue({}, 0, {});
}
this.help = config.help;
this.aggregate = config.aggregator || AGGREGATORS.SUM;

config.registers.forEach(registryInstance =>
registryInstance.registerMetric(this)
Expand Down
2 changes: 2 additions & 0 deletions lib/histogram.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const validateLabelNames = require('./validation').validateLabelName;
const isObject = require('./util').isObject;
const printDeprecationObjectConstructor = require('./util')
.printDeprecationObjectConstructor;
const AGGREGATORS = require('./metricAggregators').AGGREGATORS;

class Histogram {
/**
Expand Down Expand Up @@ -64,6 +65,7 @@ class Histogram {

this.name = config.name;
this.help = config.help;
this.aggregate = config.aggregator || AGGREGATORS.SUM;

this.upperBounds = config.buckets;
this.bucketValues = this.upperBounds.reduce((acc, upperBound) => {
Expand Down
Loading

0 comments on commit 626ed92

Please sign in to comment.