Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add metric for active workflow count #13420

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/@n8n/config/src/configs/endpoints.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ class PrometheusMetricsConfig {
/** How often (in seconds) to update queue metrics. */
@Env('N8N_METRICS_QUEUE_METRICS_INTERVAL')
queueMetricsInterval: number = 20;

/** Whether to include metric for active workflow count */
@Env('N8N_METRICS_INCLUDE_ACTIVE_WORKFLOW_COUNT_METRIC')
includeActiveWorkflowCountMetric: boolean = false;

/** How often (in seconds) to update active workflow metric */
@Env('N8N_METRICS_ACTIVE_WORKFLOW_METRIC_INTERVAL')
activeWorkflowCountInterval: number = 60;
Comment on lines +62 to +67
Copy link
Collaborator

@tomi tomi Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these to be configurable? Or could we simply add them for all? Asking because I would much rather see us reduce the amount of things one can configure to simplify things

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric itself is configurable because most of the other metrics are, I don't see why it couldn't be activated just baseline if metrics are enabled. As far as the cache TTL goes, I think 60 seconds is a sensible default but whether or not that is useful depends on how often you ping /metrics. So I'm not sure I'd make that just a static value

}

@Config
Expand Down
2 changes: 2 additions & 0 deletions packages/@n8n/config/test/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ describe('GlobalConfig', () => {
includeApiStatusCodeLabel: false,
includeQueueMetrics: false,
queueMetricsInterval: 20,
includeActiveWorkflowCountMetric: false,
activeWorkflowCountInterval: 60,
},
additionalNonUIRoutes: '',
disableProductionWebhooksOnMainProcess: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
return activeWorkflows.map((workflow) => workflow.id);
}

async getActiveCount() {
return await this.count({
where: { active: true },
});
}

async findById(workflowId: string) {
return await this.findOne({
where: { id: workflowId },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { InstanceSettings } from 'n8n-core';
import promClient from 'prom-client';

import config from '@/config';
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import type { EventService } from '@/events/event.service';
import { mockInstance } from '@test/mocking';
Expand Down Expand Up @@ -52,12 +53,14 @@ describe('PrometheusMetricsService', () => {
const eventBus = mock<MessageEventBus>();
const eventService = mock<EventService>();
const instanceSettings = mock<InstanceSettings>({ instanceType: 'main' });
const workflowRepository = mock<WorkflowRepository>();
const prometheusMetricsService = new PrometheusMetricsService(
mock(),
eventBus,
globalConfig,
eventService,
instanceSettings,
workflowRepository,
);

afterEach(() => {
Expand All @@ -75,6 +78,7 @@ describe('PrometheusMetricsService', () => {
customGlobalConfig,
mock(),
instanceSettings,
mock(),
);

await customPrometheusMetricsService.init(app);
Expand Down Expand Up @@ -234,5 +238,20 @@ describe('PrometheusMetricsService', () => {
expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics
expect(eventService.on).not.toHaveBeenCalled();
});

it('should setup active workflow count metric if enabled', async () => {
prometheusMetricsService.enableMetric('activeWorkflowCount');

await prometheusMetricsService.init(app);

// First call is n8n version metric
expect(promClient.Gauge).toHaveBeenCalledTimes(2);

expect(promClient.Gauge).toHaveBeenNthCalledWith(2, {
name: 'n8n_active_workflow_count',
help: 'Total number of active workflows.',
collect: expect.any(Function),
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import promClient from 'prom-client';

import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { EventMessageWorkflow } from '@/eventbus/event-message-classes/event-message-workflow';
import type { EventService } from '@/events/event.service';
import type { CacheService } from '@/services/cache/cache.service';
import { mockInstance } from '@test/mocking';

import { MessageEventBus } from '../../eventbus/message-event-bus/message-event-bus';
Expand All @@ -15,8 +17,10 @@ jest.unmock('@/eventbus/message-event-bus/message-event-bus');

const customPrefix = 'custom_';

const cacheService = mock<CacheService>();
const eventService = mock<EventService>();
const instanceSettings = mock<InstanceSettings>({ instanceType: 'main' });
const workflowRepository = mock<WorkflowRepository>();
const app = mock<express.Application>();
const eventBus = new MessageEventBus(
mock(),
Expand Down Expand Up @@ -48,6 +52,7 @@ describe('workflow_success_total', () => {
globalConfig,
eventService,
instanceSettings,
workflowRepository,
);

await prometheusMetricsService.init(app);
Expand Down Expand Up @@ -87,6 +92,7 @@ workflow_success_total{workflow_id="1234"} 1"
globalConfig,
eventService,
instanceSettings,
workflowRepository,
);

await prometheusMetricsService.init(app);
Expand All @@ -107,3 +113,70 @@ workflow_success_total{workflow_id="1234"} 1"
}
});
});

describe('Active workflow count', () => {
const globalConfig = mockInstance(GlobalConfig, {
endpoints: {
metrics: {
prefix: '',
includeActiveWorkflowCountMetric: true,
activeWorkflowCountInterval: 30,
},
},
});

const prometheusMetricsService = new PrometheusMetricsService(
cacheService,
eventBus,
globalConfig,
eventService,
instanceSettings,
workflowRepository,
);

afterEach(() => {
jest.clearAllMocks();
prometheusMetricsService.disableAllMetrics();
});

it('should prioritize cached value', async () => {
prometheusMetricsService.enableMetric('activeWorkflowCount');
await prometheusMetricsService.init(app);

cacheService.get.mockReturnValueOnce(Promise.resolve('1'));
workflowRepository.getActiveCount.mockReturnValueOnce(Promise.resolve(2));

const activeWorkflowCount =
await promClient.register.getSingleMetricAsString('active_workflow_count');

expect(cacheService.get).toHaveBeenCalledWith('metrics:active-workflow-count');
expect(workflowRepository.getActiveCount).not.toHaveBeenCalled();

expect(activeWorkflowCount).toMatchInlineSnapshot(`
"# HELP active_workflow_count Total number of active workflows.
# TYPE active_workflow_count gauge
active_workflow_count 1"
`);
});

it('should query value from database if cache misses', async () => {
prometheusMetricsService.enableMetric('activeWorkflowCount');
await prometheusMetricsService.init(app);

cacheService.get.mockReturnValueOnce(Promise.resolve(undefined));
workflowRepository.getActiveCount.mockReturnValueOnce(Promise.resolve(2));

const activeWorkflowCount =
await promClient.register.getSingleMetricAsString('active_workflow_count');

expect(cacheService.get).toHaveBeenCalledWith('metrics:active-workflow-count');
expect(workflowRepository.getActiveCount).toHaveBeenCalled();
expect(cacheService.set).toHaveBeenCalledWith('metrics:active-workflow-count', '2', 30_000);

expect(activeWorkflowCount).toMatchInlineSnapshot(`
"# HELP active_workflow_count Total number of active workflows.
# TYPE active_workflow_count gauge
active_workflow_count 2"
`);
});
});
40 changes: 40 additions & 0 deletions packages/cli/src/metrics/prometheus-metrics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import semverParse from 'semver/functions/parse';

import config from '@/config';
import { N8N_VERSION } from '@/constants';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { EventMessageTypes } from '@/eventbus';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
Expand All @@ -24,6 +25,7 @@ export class PrometheusMetricsService {
private readonly globalConfig: GlobalConfig,
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings,
private readonly workflowRepository: WorkflowRepository,
) {}

private readonly counters: { [key: string]: Counter<string> | null } = {};
Expand All @@ -39,6 +41,7 @@ export class PrometheusMetricsService {
cache: this.globalConfig.endpoints.metrics.includeCacheMetrics,
logs: this.globalConfig.endpoints.metrics.includeMessageEventBusMetrics,
queue: this.globalConfig.endpoints.metrics.includeQueueMetrics,
activeWorkflowCount: this.globalConfig.endpoints.metrics.includeActiveWorkflowCountMetric,
},
labels: {
credentialsType: this.globalConfig.endpoints.metrics.includeCredentialTypeLabel,
Expand All @@ -58,6 +61,7 @@ export class PrometheusMetricsService {
this.initEventBusMetrics();
this.initRouteMetrics(app);
this.initQueueMetrics();
this.initActiveWorkflowCountMetric();
this.mountMetricsEndpoint(app);
}

Expand Down Expand Up @@ -285,6 +289,42 @@ export class PrometheusMetricsService {
});
}

/**
* Setup active workflow count metric
*
* This metric is updated every time metrics are collected.
* We also cache the value of active workflow counts so we
* don't hit the database on every metrics query. Both the
* metric being enabled and the TTL of the cached value is
* configurable.
*/
private initActiveWorkflowCountMetric() {
if (!this.includes.metrics.activeWorkflowCount) return;

const workflowRepository = this.workflowRepository;
const cacheService = this.cacheService;
const cacheKey = 'metrics:active-workflow-count';
const cacheTtl = this.globalConfig.endpoints.metrics.activeWorkflowCountInterval * 1_000;

new promClient.Gauge({
name: this.prefix + 'active_workflow_count',
help: 'Total number of active workflows.',
async collect() {
const value = await cacheService.get<string>(cacheKey);
const numericValue = value !== undefined ? parseInt(value, 10) : undefined;

if (numericValue !== undefined && Number.isFinite(numericValue)) {
this.set(numericValue);
} else {
const activeWorkflowCount = await workflowRepository.getActiveCount();
await cacheService.set(cacheKey, activeWorkflowCount.toString(), cacheTtl);

this.set(activeWorkflowCount);
}
},
});
}

private toLabels(event: EventMessageTypes): Record<string, string> {
const { __type, eventName, payload } = event;

Expand Down
8 changes: 7 additions & 1 deletion packages/cli/src/metrics/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
export type MetricCategory = 'default' | 'routes' | 'cache' | 'logs' | 'queue';
export type MetricCategory =
| 'default'
| 'routes'
| 'cache'
| 'logs'
| 'queue'
| 'activeWorkflowCount';

export type MetricLabel =
| 'credentialsType'
Expand Down
Loading