Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(instrumentation-pg): not add duplicate listeners to pg pool #2484

Merged
merged 8 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
} from '@opentelemetry/semantic-conventions/incubating';

export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConfig> {
private _pgPoolListenersSet: boolean = false;
private _operationDuration!: Histogram;
private _connectionsCount!: UpDownCounter;
private _connectionPendingRequests!: UpDownCounter;
Expand All @@ -85,6 +86,7 @@
}

override _updateMetricInstruments() {
this._pgPoolListenersSet = false;
this._operationDuration = this.meter.createHistogram(
METRIC_DB_CLIENT_OPERATION_DURATION,
{
Expand Down Expand Up @@ -435,6 +437,52 @@
};
}

private setPoolConnectEventListeners(pgPool: PgPoolExtended) {
if (this._pgPoolListenersSet) return;
const poolName = utils.getPoolName(pgPool.options);

pgPool.on('connect', () => {
this._connectionsCounter = utils.updateCounter(
poolName,
pgPool,
this._connectionsCount,
this._connectionPendingRequests,
this._connectionsCounter
);
});

pgPool.on('acquire', () => {
this._connectionsCounter = utils.updateCounter(
poolName,
pgPool,
this._connectionsCount,
this._connectionPendingRequests,
this._connectionsCounter
);
});

pgPool.on('remove', () => {
this._connectionsCounter = utils.updateCounter(
poolName,
pgPool,
this._connectionsCount,
this._connectionPendingRequests,
this._connectionsCounter
);
});

pgPool.on('release' as any, () => {
this._connectionsCounter = utils.updateCounter(

Check warning on line 475 in plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts

View check run for this annotation

Codecov / codecov/patch

plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts#L475

Added line #L475 was not covered by tests
poolName,
pgPool,
this._connectionsCount,
this._connectionPendingRequests,
this._connectionsCounter
);
});
this._pgPoolListenersSet = true;
}

private _getPoolConnectPatch() {
const plugin = this;
return (originalConnect: typeof pgPoolTypes.prototype.connect) => {
Expand All @@ -449,41 +497,7 @@
attributes: utils.getSemanticAttributesFromPool(this.options),
});

this.on('connect', () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

this.on('acquire', () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

this.on('remove', () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

this.on('release' as any, () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});
plugin.setPoolConnectEventListeners(this);

if (callback) {
const parentSpan = trace.getSpan(context.active());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,12 @@ export interface poolConnectionsCounter {
}

export function updateCounter(
poolName: string,
pool: PgPoolExtended,
connectionCount: UpDownCounter,
connectionPendingRequests: UpDownCounter,
latestCounter: poolConnectionsCounter
): poolConnectionsCounter {
const poolName = getPoolName(pool.options);
const all = pool.totalCount;
const pending = pool.waitingCount;
const idle = pool.idleCount;
Expand Down
159 changes: 159 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,5 +572,164 @@ describe('pg-pool', () => {
});
});
});

it('should not add duplicate event listeners to PgPool events', done => {
const poolAux: pgPool<pg.Client> = new pgPool(CONFIG);
let completed = 0;
poolAux.connect((err, client, release) => {
if (err) {
throw new Error(err.message);
}
if (!release) {
throw new Error('Did not receive release function');
}
if (!client) {
throw new Error('No client received');
}
assert.ok(client);
release();

assert.equal(
poolAux.listenerCount('connect'),
1,
"More than one event listener for 'connect'"
);
assert.equal(
poolAux.listenerCount('acquire'),
1,
"More than one event listener for 'acquire'"
);
assert.equal(
poolAux.listenerCount('remove'),
1,
"More than one event listener for 'remove'"
);
assert.equal(
poolAux.listenerCount('release'),
1,
"More than one event listener for 'release'"
);

completed++;
if (completed >= 2) {
done();
}
});

poolAux.connect((err, client, release) => {
if (err) {
throw new Error(err.message);
}
if (!release) {
throw new Error('Did not receive release function');
}
if (!client) {
throw new Error('No client received');
}
assert.ok(client);
release();

assert.equal(
poolAux.listenerCount('connect'),
1,
"More than one event listener for 'connect'"
);
assert.equal(
poolAux.listenerCount('acquire'),
1,
"More than one event listener for 'acquire'"
);
assert.equal(
poolAux.listenerCount('remove'),
1,
"More than one event listener for 'remove'"
);
assert.equal(
poolAux.listenerCount('release'),
1,
"More than one event listener for 'release'"
);

completed++;
if (completed >= 2) {
done();
}
});
});

it('adding a custom event listener should still work with the default event listeners to PgPool events', done => {
const poolAux: pgPool<pg.Client> = new pgPool(CONFIG);
let testValue = 0;
poolAux.on('connect', () => {
testValue = 1;
});

poolAux.connect((err, client, release) => {
if (err) {
throw new Error(err.message);
}
if (!release) {
throw new Error('Did not receive release function');
}
if (!client) {
throw new Error('No client received');
}
assert.ok(client);

client.query('SELECT NOW()', async (err, ret) => {
release();
if (err) {
throw new Error(err.message);
}
assert.ok(ret);
assert.equal(
poolAux.listenerCount('connect'),
2,
"More than one event listener for 'connect'"
);
assert.equal(
poolAux.listenerCount('acquire'),
1,
"More than one event listener for 'acquire'"
);
assert.equal(
poolAux.listenerCount('remove'),
1,
"More than one event listener for 'remove'"
);
assert.equal(
poolAux.listenerCount('release'),
1,
"More than one event listener for 'release'"
);
assert.equal(testValue, 1);

const { resourceMetrics, errors } = await metricReader.collect();
assert.deepEqual(
errors,
[],
'expected no errors from the callback during metric collection'
);

const metrics = resourceMetrics.scopeMetrics[0].metrics;
assert.strictEqual(
metrics[1].descriptor.name,
METRIC_DB_CLIENT_CONNECTION_COUNT
);
assert.strictEqual(
metrics[1].dataPoints[0].attributes[
ATTR_DB_CLIENT_CONNECTION_STATE
],
'used'
);
assert.strictEqual(
metrics[1].dataPoints[0].value,
1,
'expected to have 1 used connection'
);
done();
});
});
});
});
});
Loading