Skip to content

Commit

Permalink
Support aggregating metrics from a cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
zbjornson committed Aug 28, 2017
1 parent bb4586d commit 4b9218d
Show file tree
Hide file tree
Showing 18 changed files with 677 additions and 24 deletions.
9 changes: 8 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"plugins": ["prettier"],
"extends": "eslint:recommended",
"env": {
"node": true
"node": true,
"es6": true
},
"parserOptions": {
"ecmaVersion": 2015
Expand Down Expand Up @@ -55,6 +56,12 @@
"no-shadow": "off",
"no-unused-expressions": "off"
}
},
{
"files": ["example/**/*.js"],
"rules": {
"no-console": "off"
}
}
]
}
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@ A prometheus client for node.js that supports histogram, summaries, gauges and c

### Usage

See example folder for a sample usage. The library does not bundle any web framework, to expose the metrics just return the metrics() function in the registry.
See example folder for a sample usage. The library does not bundle any web framework, to expose the metrics just return the `metrics()` function in the registry.

#### Usage with Node.js's `cluster` module

Node.js's `cluster` module spawns multiple processes and hands off socket connections to those workers. Returning metrics from a worker's local registry will only reveal that individual worker's metrics, which is generally undesirable. To solve this, you can aggregate all of the workers' metrics in the master process. See `example/cluster.js` for an example.

Default metrics use sensible aggregation methods. Custom metrics are summed across workers by default. To use a different aggregation method, set the `aggregator` property in the metric config to one of 'sum', 'first', 'min', 'max', 'average' or 'omit'. (See `lib/metrics/version.js` for an example.)

If you need to expose metrics about an individual worker, you can include a value that is unique to the worker (such as the worker ID or process ID) in a label. (See `example/server.js` for an example using `worker_${cluster.worker.id}` as a label value.)

Note: You must use the default global registry in cluster mode.

### API

Expand Down Expand Up @@ -248,7 +258,7 @@ You can prevent this by setting last parameter when creating the metric to `fals
Using non-global registries requires creating Registry instance and adding it inside `registers` inside the configuration object. Alternatively
you can pass an empty `registers` array and register it manually.

Registry has a merge function that enables you to expose multiple registries on the same endpoint. If the same metric name exists in both registries, an error will be thrown.
Registry has a `merge` function that enables you to expose multiple registries on the same endpoint. If the same metric name exists in both registries, an error will be thrown.

```js
const client = require('prom-client');
Expand Down Expand Up @@ -280,6 +290,22 @@ If you need to get a reference to a previously registered metric, you can use `r
You can remove all metrics by calling `register.clear()`. You can also remove a single metric by calling
`register.removeSingleMetric(*name of metric*)`.

##### Cluster metrics

You can get aggregated metrics for all workers in a node.js cluster with `register.clusterMetrics()`. This method both returns a promise and accepts a callback, both of which resolve with a metrics string suitable for Prometheus to consume.

```js
register.clusterMetrics()
.then(metrics => { /* ... */ })
.catch(err => { /* ... */ });

// - or -

register.clusterMetrics((err, metrics) => {
// ...
});
```

#### Pushgateway

It is possible to push metrics via a [Pushgateway](https://github.com/prometheus/pushgateway).
Expand Down
28 changes: 28 additions & 0 deletions example/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict';

const cluster = require('cluster');
const express = require('express');
const metricsServer = express();
const AggregatorRegistry = require('../').AggregatorRegistry;
const aggregatorRegistry = new AggregatorRegistry();

if (cluster.isMaster) {
for (let i = 0; i < 4; i++) {
cluster.fork();
}

metricsServer.get('/cluster_metrics', (req, res) => {
aggregatorRegistry.clusterMetrics((err, metrics) => {
if (err) console.log(err);
res.set('Content-Type', aggregatorRegistry.contentType);
res.send(metrics);
});
});

metricsServer.listen(3001);
console.log(
'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics'
);
} else {
require('./server.js');
}
9 changes: 8 additions & 1 deletion example/server.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const express = require('express');
const cluster = require('cluster');
const server = express();
const register = require('../').register;

Expand Down Expand Up @@ -48,6 +49,13 @@ setInterval(() => {
g.labels('post', '300').inc();
}, 100);

if (cluster.isWorker) {
// Expose some worker-specific metric as an example
setInterval(() => {
c.inc({ code: `worker_${cluster.worker.id}` });
}, 2000);
}

server.get('/metrics', (req, res) => {
res.set('Content-Type', register.contentType);
res.end(register.metrics());
Expand All @@ -61,6 +69,5 @@ server.get('/metrics/counter', (req, res) => {
//Enable collection of default metrics
require('../').collectDefaultMetrics();

//eslint-disable-next-line no-console
console.log('Server listening to 3000, metrics exposed on /metrics endpoint');
server.listen(3000);
34 changes: 34 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,40 @@ export class Registry {
*/
export const register: Registry;

export class AggregatorRegistry extends Registry {
/**
* Gets aggregated metrics for all workers. The optional callback and
* returned Promise resolve with the same value; either may be used.
* @param {Function} callback? (err, metrics) => any
* @return {Promise<string>} Promise that resolves with the aggregated
* metrics.
*/
clusterMetrics(
cb: (err: Error | null, metrics?: string) => any
): Promise<string>;

/**
* Creates a new Registry instance from an array of metrics that were
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
* the method specified by their `aggregator` property, or by summation if
* `aggregator` is undefined.
* @param {Array} metricsArr Array of metrics, each of which created by
* `registry.getMetricsAsJSON()`.
* @return {Registry} aggregated registry.
*/
static aggregate(metricsArr: Array<Object>): Registry;
}

/**
* General metric type
*/
export type Metric = Counter | Gauge | Summary | Histogram;

/**
* Aggregation methods, used for aggregating metrics in a Node.js cluster.
*/
export type Aggregator = 'omit' | 'sum' | 'first' | 'min' | 'max' | 'average';

export enum MetricType {
Counter,
Gauge,
Expand All @@ -89,6 +118,7 @@ interface metric {
name: string;
help: string;
type: MetricType;
aggregator: Aggregator;
}

interface labelValues {
Expand All @@ -100,6 +130,7 @@ export interface CounterConfiguration {
help: string;
labelNames?: string[];
registers?: Registry[];
aggregator?: Aggregator;
}

/**
Expand Down Expand Up @@ -158,6 +189,7 @@ export interface GaugeConfiguration {
help: string;
labelNames?: string[];
registers?: Registry[];
aggregator?: Aggregator;
}

/**
Expand Down Expand Up @@ -285,6 +317,7 @@ export interface HistogramConfiguration {
labelNames?: string[];
buckets?: number[];
registers?: Registry[];
aggregator?: Aggregator;
}

/**
Expand Down Expand Up @@ -378,6 +411,7 @@ export interface SummaryConfiguration {
labelNames?: string[];
percentiles?: number[];
registers?: Registry[];
aggregator?: Aggregator;
}

/**
Expand Down
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ exports.linearBuckets = require('./lib/bucketGenerators').linearBuckets;
exports.exponentialBuckets = require('./lib/bucketGenerators').exponentialBuckets;

exports.collectDefaultMetrics = require('./lib/defaultMetrics');

exports.aggregators = require('./lib/metricAggregators').aggregators;
exports.AggregatorRegistry = require('./lib/cluster');
146 changes: 146 additions & 0 deletions lib/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
'use strict';

/**
* Extends the Registry class with a `clusterMetrics` method that returns
* aggregated metrics for all workers.
*
* In cluster workers, listens for and responds to requests for metrics by the
* cluster master.
*/

const cluster = require('cluster');
const Registry = require('./registry');
const util = require('./util');
const aggregators = require('./metricAggregators').aggregators;

const GET_METRICS_REQ = 'prom-client:getMetricsReq';
const GET_METRICS_RES = 'prom-client:getMetricsRes';

let requestCtr = 0; // Concurrency control
const requests = new Map(); // Pending requests for workers' local metrics.

class AggregatorRegistry extends Registry {
/**
* Gets aggregated metrics for all workers. The optional callback and
* returned Promise resolve with the same value; either may be used.
* @param {Function} callback? (err, metrics) => any
* @return {Promise<string>} Promise that resolves with the aggregated
* metrics.
*/
clusterMetrics(callback) {
const requestId = requestCtr++;

callback = callback || function() {};

return new Promise((resolve, reject) => {
const request = {
responses: [],
pending: Object.keys(cluster.workers).length,
callback,
resolve,
reject,
errorTimeout: setTimeout(() => {
request.failed = true;
const err = new Error('Operation timed out.');
request.callback(err);
reject(err);
}, 5000),
failed: false
};
requests.set(requestId, request);

const message = {
type: GET_METRICS_REQ,
requestId
};
for (const id in cluster.workers) cluster.workers[id].send(message);
});
}

/**
* Creates a new Registry instance from an array of metrics that were
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
* the method specified by their `aggregator` property, or by summation if
* `aggregator` is undefined.
* @param {Array} metricsArr Array of metrics, each of which created by
* `registry.getMetricsAsJSON()`.
* @return {Registry} aggregated registry.
*/
static aggregate(metricsArr) {
const aggregatedRegistry = new Registry();
const metricsByName = new util.Grouper();

// Gather by name
metricsArr.forEach(metrics => {
metrics.forEach(metric => {
metricsByName.add(metric.name, metric);
});
});

// Aggregate gathered metrics. Default to summation.
metricsByName.forEach(metrics => {
const aggregatorName = metrics[0].aggregator || 'sum';
const aggregatorFn = aggregators[aggregatorName];
if (typeof aggregatorFn !== 'function') {
throw new Error(`'${aggregatorName}' is not a defined aggregator.`);
}
const aggregatedMetric = aggregatorFn(metrics);
// NB: The 'omit' aggregator returns undefined.
if (aggregatedMetric) {
const aggregatedMetricWrapper = Object.assign(
{
get: () => aggregatedMetric
},
aggregatedMetric
);
aggregatedRegistry.registerMetric(aggregatedMetricWrapper);
}
});

return aggregatedRegistry;
}
}

if (cluster.isMaster) {
// Listen for worker responses to requests for local metrics
cluster.on('message', (worker, message) => {
if (arguments.length === 2) {
// pre-Node.js v6.0
message = worker;
worker = undefined;
}

if (message.type === GET_METRICS_RES) {
const request = requests.get(message.requestId);
request.responses.push(message.metrics);
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

if (request.failed) return; // Callback already run with Error.

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.callback(null, promString);
request.resolve(promString);
}
}
});
} else if (cluster.isWorker) {
// Respond to master's requests for worker's local metrics.
process.on('message', message => {
if (message.type === GET_METRICS_REQ) {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
// TODO see if we can support the non-global registry also.
metrics: Registry.globalRegistry.getMetricsAsJSON()
});
}
});
}

module.exports = AggregatorRegistry;
4 changes: 3 additions & 1 deletion lib/counter.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class Counter {
}

this.help = config.help;
this.aggregator = config.aggregator || 'sum';

config.registers.forEach(registryInstance =>
registryInstance.registerMetric(this)
Expand Down Expand Up @@ -100,7 +101,8 @@ class Counter {
help: this.help,
name: this.name,
type,
values: getProperties(this.hashMap)
values: getProperties(this.hashMap),
aggregator: this.aggregator
};
}

Expand Down
Loading

0 comments on commit 4b9218d

Please sign in to comment.