-
Notifications
You must be signed in to change notification settings - Fork 378
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support aggregating metrics from a cluster
- Loading branch information
Showing
17 changed files
with
643 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
'use strict'; | ||
|
||
const cluster = require('cluster'); | ||
const express = require('express'); | ||
const metricsServer = express(); | ||
const AggregatorRegistry = require('../').AggregatorRegistry; | ||
const aggregatorRegistry = new AggregatorRegistry(); | ||
|
||
if (cluster.isMaster) { | ||
for (let i = 0; i < 4; i++) { | ||
cluster.fork(); | ||
} | ||
|
||
metricsServer.get('/cluster_metrics', (req, res) => { | ||
aggregatorRegistry.clusterMetrics((err, metrics) => { | ||
if (err) console.log(err); | ||
res.set('Content-Type', aggregatorRegistry.contentType); | ||
res.send(metrics); | ||
}); | ||
}); | ||
|
||
metricsServer.listen(3001); | ||
console.log( | ||
'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics' | ||
); | ||
} else { | ||
require('./server.js'); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
'use strict'; | ||
|
||
/** | ||
* Extends the Registry class with a `clusterMetrics` method that returns | ||
* aggregated metrics for all workers. | ||
* | ||
* In cluster workers, listens for and responds to requests for metrics by the | ||
* cluster master. | ||
*/ | ||
|
||
const cluster = require('cluster'); | ||
const Registry = require('./registry'); | ||
const util = require('./util'); | ||
const aggregators = require('./metricAggregators').aggregators; | ||
|
||
const GET_METRICS_REQ = 'prom-client:getMetricsReq'; | ||
const GET_METRICS_RES = 'prom-client:getMetricsRes'; | ||
|
||
let requestCtr = 0; // Concurrency control | ||
const requests = new Map(); // Pending requests for workers' local metrics. | ||
|
||
class AggregatorRegistry extends Registry { | ||
/** | ||
* Gets aggregated metrics for all workers. The optional callback and | ||
* returned Promise resolve with the same value; either may be used. | ||
* @param {Function} callback? (err, metrics) => any | ||
* @return {Promise<string>} Promise that resolves with the aggregated | ||
* metrics. | ||
*/ | ||
clusterMetrics(callback) { | ||
const requestId = requestCtr++; | ||
|
||
callback = callback || function() {}; | ||
|
||
return new Promise((resolve, reject) => { | ||
const request = { | ||
responses: [], | ||
pending: Object.keys(cluster.workers).length, | ||
callback, | ||
resolve, | ||
reject, | ||
errorTimeout: setTimeout(() => { | ||
request.failed = true; | ||
const err = new Error('Operation timed out.'); | ||
request.callback(err); | ||
reject(err); | ||
}, 5000), | ||
failed: false | ||
}; | ||
requests.set(requestId, request); | ||
|
||
const message = { | ||
type: GET_METRICS_REQ, | ||
requestId | ||
}; | ||
for (const id in cluster.workers) cluster.workers[id].send(message); | ||
}); | ||
} | ||
|
||
/** | ||
* Creates a new Registry instance from an array of metrics that were | ||
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using | ||
* the method specified by their `aggregator` property, or by summation if | ||
* `aggregator` is undefined. | ||
* @param {Array} metricsArr Array of metrics, each of which created by | ||
* `registry.getMetricsAsJSON()`. | ||
* @return {Registry} aggregated registry. | ||
*/ | ||
static aggregate(metricsArr) { | ||
const aggregatedRegistry = new Registry(); | ||
const metricsByName = new util.Grouper(); | ||
|
||
// Gather by name | ||
metricsArr.forEach(metrics => { | ||
metrics.forEach(metric => { | ||
metricsByName.add(metric.name, metric); | ||
}); | ||
}); | ||
|
||
// Aggregate gathered metrics. Default to summation. | ||
metricsByName.forEach(metrics => { | ||
const aggregatorName = metrics[0].aggregator || 'sum'; | ||
const aggregatorFn = aggregators[aggregatorName]; | ||
if (typeof aggregatorFn !== 'function') { | ||
throw new Error(`'${aggregatorName}' is not a defined aggregator.`); | ||
} | ||
const aggregatedMetric = aggregatorFn(metrics); | ||
// NB: The 'omit' aggregator returns undefined. | ||
if (aggregatedMetric) { | ||
const aggregatedMetricWrapper = Object.assign( | ||
{ | ||
get: () => aggregatedMetric | ||
}, | ||
aggregatedMetric | ||
); | ||
aggregatedRegistry.registerMetric(aggregatedMetricWrapper); | ||
} | ||
}); | ||
|
||
return aggregatedRegistry; | ||
} | ||
} | ||
|
||
if (cluster.isMaster) { | ||
// Listen for worker responses to requests for local metrics | ||
cluster.on('message', (worker, message) => { | ||
if (arguments.length === 2) { | ||
// pre-Node.js v6.0 | ||
message = worker; | ||
worker = undefined; | ||
} | ||
|
||
if (message.type === GET_METRICS_RES) { | ||
const request = requests.get(message.requestId); | ||
request.responses.push(message.metrics); | ||
request.pending--; | ||
|
||
if (request.pending === 0) { | ||
// finalize | ||
requests.delete(message.requestId); | ||
clearTimeout(request.errorTimeout); | ||
|
||
if (request.failed) return; // Callback already run with Error. | ||
|
||
const registry = AggregatorRegistry.aggregate(request.responses); | ||
const promString = registry.metrics(); | ||
request.callback(null, promString); | ||
request.resolve(promString); | ||
} | ||
} | ||
}); | ||
} else if (cluster.isWorker) { | ||
// Respond to master's requests for worker's local metrics. | ||
process.on('message', message => { | ||
if (message.type === GET_METRICS_REQ) { | ||
process.send({ | ||
type: GET_METRICS_RES, | ||
requestId: message.requestId, | ||
// TODO see if we can support the non-global registry also. | ||
metrics: Registry.globalRegistry.getMetricsAsJSON() | ||
}); | ||
} | ||
}); | ||
} | ||
|
||
module.exports = AggregatorRegistry; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.