Skip to content

Commit

Permalink
feat: generalise the grouped metrics and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Mar 20, 2021
1 parent c1a2388 commit 877ffac
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 137 deletions.
152 changes: 152 additions & 0 deletions packages/cosmic-swingset/lib/kernel-stats.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,147 @@
import { MeterProvider } from '@opentelemetry/metrics';
import { PrometheusExporter } from '@opentelemetry/exporter-prometheus';

import makeStore from '@agoric/store';

import {
KERNEL_STATS_SUM_METRICS,
KERNEL_STATS_UPDOWN_METRICS,
} from '@agoric/swingset-vat/src/kernel/metrics';

export const DEFAULT_METER_PROVIDER = new MeterProvider();

export const HISTOGRAM_SECONDS_LATENCY_BOUNDARIES = [
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1,
2.5,
5,
10,
Infinity,
];

const wrapDeltaMS = (finisher, useDeltaMS) => {
const startMS = Date.now();
return (...finishArgs) => {
try {
return finisher(...finishArgs);
} finally {
const deltaMS = Date.now() - startMS;
useDeltaMS(deltaMS, finishArgs);
}
};
};

const recordToKey = record =>
JSON.stringify(
Object.entries(record).sort(([ka], [kb]) => (ka < kb ? -1 : 1)),
);

export function makeSlogCallbacks({ metricMeter, labels }) {
const nameToBaseMetric = makeStore('baseMetricName');
nameToBaseMetric.init(
'swingset_vat_startup',
metricMeter.createValueRecorder('swingset_vat_startup', {
description: 'Vat startup time (ms)',
}),
);
nameToBaseMetric.init(
'swingset_vat_delivery',
metricMeter.createValueRecorder('swingset_vat_delivery', {
description: 'Vat delivery time (ms)',
}),
);
nameToBaseMetric.init(
'swingset_meter_usage',
metricMeter.createValueRecorder('swingset_meter_usage', {
description: 'Vat meter usage',
}),
);
const groupToMetrics = makeStore('metricGroup');

/**
* This function reuses or creates per-group named metrics.
*
* @param {string} name name of the base metric
* @param {Record<string, string>} [group] the labels to associate with a group
* @param {Record<string, string>} [instance] the specific metric labels
* @returns {any} the labelled metric
*/
const getGroupedMetric = (name, group = {}, instance = {}) => {
let nameToMetric;
const groupKey = recordToKey(group);
const instanceKey = recordToKey(instance);
if (groupToMetrics.has(groupKey)) {
let oldInstanceKey;
[nameToMetric, oldInstanceKey] = groupToMetrics.get(groupKey);
if (instanceKey !== oldInstanceKey) {
for (const metric of nameToMetric.values()) {
// FIXME: Delete all the metrics of the old instance.
metric;
}
// Refresh the metric group.
nameToMetric = makeStore('metricName');
groupToMetrics.set(groupKey, [nameToMetric, instanceKey]);
}
} else {
nameToMetric = makeStore('metricName');
groupToMetrics.init(groupKey, [nameToMetric, instanceKey]);
}

let metric;
if (nameToMetric.has(name)) {
metric = nameToMetric.get(name);
} else {
// Bind the base metric to the group and instance labels.
metric = nameToBaseMetric
.get(name)
.bind({ ...labels, ...group, ...instance });
nameToMetric.init(name, metric);
}
return metric;
};

/**
* Measure some interesting stats. We currently do a per-vat recording of
* time spent in the vat for startup and delivery.
*/
const slogCallbacks = {
startup(_method, [vatID], finisher) {
return wrapDeltaMS(finisher, deltaMS => {
getGroupedMetric('swingset_vat_startup', { vatID }).record(deltaMS);
});
},
delivery(_method, [vatID], finisher) {
return wrapDeltaMS(finisher, (deltaMS, [[_status, _problem, usage]]) => {
getGroupedMetric('swingset_vat_delivery', { vatID }).record(deltaMS);
if (usage) {
// Add to aggregated metering stats.
const group = { vatID };
for (const [key, value] of Object.entries(usage)) {
if (key === 'meterType') {
// eslint-disable-next-line no-continue
continue;
}
getGroupedMetric(`swingset_meter_usage`, group, {
// The meterType is an instance-specific label--a change in it
// will result in the old value being discarded.
...(usage.meterType && { meterType: usage.meterType }),
stat: key,
}).record(value || 0);
}
}
});
},
};

return harden(slogCallbacks);
}

/**
* @param {Object} param0
* @param {any} param0.controller
Expand Down Expand Up @@ -69,6 +205,22 @@ export function exportKernelStats({
});
batchObserverResult.observe(labels, observations);
});

const schedulerCrankTimeHistogram = metricMeter
.createValueRecorder('swingset_crank_processing_time', {
description: 'Processing time per crank (ms)',
boundaries: [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, Infinity],
})
.bind(labels);

const schedulerBlockTimeHistogram = metricMeter
.createValueRecorder('swingset_block_processing_seconds', {
description: 'Processing time per block',
boundaries: HISTOGRAM_SECONDS_LATENCY_BOUNDARIES,
})
.bind(labels);

return { schedulerCrankTimeHistogram, schedulerBlockTimeHistogram };
}

/**
Expand Down
162 changes: 25 additions & 137 deletions packages/cosmic-swingset/lib/launch-chain.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import path from 'path';
import anylogger from 'anylogger';

import { MeterProvider } from '@opentelemetry/metrics';

import {
buildMailbox,
buildMailboxStateMap,
Expand All @@ -14,34 +12,22 @@ import {
loadBasedir,
loadSwingsetConfigFile,
} from '@agoric/swingset-vat';
import { assert, details as X, quote } from '@agoric/assert';
import makeStore from '@agoric/store';
import { assert, details as X } from '@agoric/assert';
import { getBestSwingStore } from './check-lmdb';
import { exportKernelStats } from './kernel-stats';
import {
DEFAULT_METER_PROVIDER,
exportKernelStats,
makeSlogCallbacks,
} from './kernel-stats';

const log = anylogger('launch-chain');
const console = anylogger('launch-chain');

const SWING_STORE_META_KEY = 'cosmos/meta';

// This is how many cranks we run per block, as per #2299.
// TODO Make it dependent upon metering instead.
const FIXME_MAX_CRANKS_PER_BLOCK = 1000;

export const HISTOGRAM_SECONDS_LATENCY_BOUNDARIES = [
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1,
2.5,
5,
10,
Infinity,
];

async function buildSwingset(
mailboxStorage,
bridgeOutbound,
Expand Down Expand Up @@ -97,9 +83,9 @@ export async function launch(
vatsDir,
argv,
debugName = undefined,
meterProvider = new MeterProvider(),
meterProvider = DEFAULT_METER_PROVIDER,
) {
log.info('Launching SwingSet kernel');
console.info('Launching SwingSet kernel');

const tempdir = path.resolve(kernelStateDBDir, 'check-lmdb-tempdir');
const { openSwingStore } = getBestSwingStore(tempdir);
Expand All @@ -110,108 +96,16 @@ export async function launch(
return doOutboundBridge(dstID, obj);
}

const wrapDeltaMS = (finisher, useDeltaMS) => {
const startMS = Date.now();
return (...finishArgs) => {
try {
return finisher(...finishArgs);
} finally {
const deltaMS = Date.now() - startMS;
useDeltaMS(deltaMS, finishArgs);
}
};
};

// Not to be confused with the gas model, this meter is for OpenTelemetry.
const metricMeter = meterProvider.getMeter('ag-chain-cosmos');
const METRIC_LABELS = { app: 'ag-chain-cosmos' };

const nameToBaseMetric = makeStore('metricName');
nameToBaseMetric.init(
'swingset_vat_startup',
metricMeter.createValueRecorder('swingset_vat_startup', {
description: 'Vat startup time (ms)',
}),
);
nameToBaseMetric.init(
'swingset_vat_delivery',
metricMeter.createValueRecorder('swingset_vat_delivery', {
description: 'Vat delivery time (ms)',
}),
);
nameToBaseMetric.init(
'swingset_meter_usage',
metricMeter.createValueRecorder('swingset_meter_usage', {
description: 'Vat meter usage',
}),
);
const vatToMetrics = makeStore();

/**
* This function reuses or creates per-vat named metrics.
*
* @param {string} vatID id of the vat to label the metric with
* @param {string} name name of the base metric
* @param {Record<string, string>} [labels] the metric statistic
* @returns {any} the labelled metric
*/
const getVatMetric = (vatID, name, labels = {}) => {
let nameToMetric;
if (vatToMetrics.has(vatID)) {
nameToMetric = vatToMetrics.get(vatID);
} else {
nameToMetric = makeStore('metricName');
vatToMetrics.init(vatID, nameToMetric);
}
let metric;
const labeledName = `${name}:${quote(labels)}`;
if (nameToMetric.has(labeledName)) {
metric = nameToMetric.get(labeledName);
} else {
// Bind the base metric to the vatID label.
metric = nameToBaseMetric
.get(name)
.bind({ ...METRIC_LABELS, vatID, ...labels });
nameToMetric.init(labeledName, metric);
}
return metric;
};

/**
* Measure some interesting stats. We currently do a per-vat recording of
* time spent in the vat for startup and delivery.
*/
const slogCallbacks = {
startup(_method, [vatID], finisher) {
return wrapDeltaMS(finisher, deltaMS => {
getVatMetric(vatID, 'swingset_vat_startup').record(deltaMS);
});
},
delivery(_method, [vatID], finisher) {
return wrapDeltaMS(finisher, (deltaMS, [[_status, _problem, used]]) => {
if (used) {
// Add to aggregated metering stats.
const labels = {};
if (used.meterType) {
labels.meterType = used.meterType;
}
for (const [key, value] of Object.entries(used)) {
if (key === 'meterType') {
// eslint-disable-next-line no-continue
continue;
}
getVatMetric(vatID, `swingset_meter_usage`, {
...labels,
stat: key,
}).record(value || 0);
}
}
getVatMetric(vatID, 'swingset_vat_delivery').record(deltaMS);
});
},
};
const slogCallbacks = makeSlogCallbacks({
metricMeter,
labels: METRIC_LABELS,
});

log.debug(`buildSwingset`);
console.debug(`buildSwingset`);
const { controller, mb, bridgeInbound, timer } = await buildSwingset(
mailboxStorage,
bridgeOutbound,
Expand All @@ -224,21 +118,15 @@ export async function launch(
},
);

exportKernelStats({ controller, metricMeter, log, labels: METRIC_LABELS });

const schedulerCrankTimeHistogram = metricMeter
.createValueRecorder('swingset_crank_processing_time', {
description: 'Processing time per crank (ms)',
boundaries: [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, Infinity],
})
.bind(METRIC_LABELS);

const schedulerBlockTimeHistogram = metricMeter
.createValueRecorder('swingset_block_processing_seconds', {
description: 'Processing time per block',
boundaries: HISTOGRAM_SECONDS_LATENCY_BOUNDARIES,
})
.bind(METRIC_LABELS);
const {
schedulerCrankTimeHistogram,
schedulerBlockTimeHistogram,
} = exportKernelStats({
controller,
metricMeter,
log: console,
labels: METRIC_LABELS,
});

// ////////////////////////////
// TODO: This is where we would add the scheduler.
Expand Down Expand Up @@ -276,7 +164,7 @@ export async function launch(
if (!mb.deliverInbound(sender, messages, ack)) {
return;
}
log.debug(`mboxDeliver: ADDED messages`);
console.debug(`mboxDeliver: ADDED messages`);
}

async function doBridgeInbound(source, body) {
Expand All @@ -288,7 +176,7 @@ export async function launch(

async function beginBlock(blockHeight, blockTime) {
const addedToQueue = timer.poll(blockTime);
log.debug(
console.debug(
`polled; blockTime:${blockTime}, h:${blockHeight}; ADDED =`,
addedToQueue,
);
Expand Down

0 comments on commit 877ffac

Please sign in to comment.