Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(Postgres Node): Backport connection pooling to postgres v1 #12484

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/* eslint-disable n8n-nodes-base/node-dirname-against-convention */
import { PostgresChatMessageHistory } from '@langchain/community/stores/message/postgres';
import { BufferMemory, BufferWindowMemory } from 'langchain/memory';
import { configurePostgres } from 'n8n-nodes-base/dist/nodes/Postgres/transport';
import type { PostgresNodeCredentials } from 'n8n-nodes-base/dist/nodes/Postgres/v2/helpers/interfaces';
import { postgresConnectionTest } from 'n8n-nodes-base/dist/nodes/Postgres/v2/methods/credentialTest';
import { configurePostgres } from 'n8n-nodes-base/dist/nodes/Postgres/v2/transport';
import type {
ISupplyDataFunctions,
INodeType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import {
type PGVectorStoreArgs,
} from '@langchain/community/vectorstores/pgvector';
import type { EmbeddingsInterface } from '@langchain/core/embeddings';
import { configurePostgres } from 'n8n-nodes-base/dist/nodes/Postgres/transport';
import type { PostgresNodeCredentials } from 'n8n-nodes-base/dist/nodes/Postgres/v2/helpers/interfaces';
import { configurePostgres } from 'n8n-nodes-base/dist/nodes/Postgres/v2/transport';
import type { INodeProperties } from 'n8n-workflow';
import type pg from 'pg';

Expand Down
8 changes: 8 additions & 0 deletions packages/nodes-base/credentials/Postgres.credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ export class Postgres implements ICredentialType {
},
default: '',
},
{
displayName: 'Maximum Number of Connections',
name: 'maxConnections',
type: 'number',
default: 100,
description:
'Make sure this value times the number of workers you have is lower than the maximum number of connections your postgres instance allows.',
},
{
displayName: 'Ignore SSL Issues (Insecure)',
name: 'allowUnauthorizedCerts',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import type {
INodeListSearchItems,
} from 'n8n-workflow';

import { configurePostgres } from './transport';
import type { PgpDatabase, PostgresNodeCredentials } from './v2/helpers/interfaces';
import { configurePostgres } from './v2/transport';

export function prepareNames(id: string, mode: string, additionalFields: IDataObject) {
let suffix = id.replace(/-/g, '_');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import type {
PgpConnectionParameters,
PostgresNodeCredentials,
PostgresNodeOptions,
} from '../helpers/interfaces';
} from '../v2/helpers/interfaces';

const getPostgresConfig = (
credentials: PostgresNodeCredentials,
Expand All @@ -29,6 +29,7 @@ const getPostgresConfig = (
user: credentials.user,
password: credentials.password,
keepAlive: true,
max: credentials.maxConnections,
};

if (options.connectionTimeout) {
Expand Down
75 changes: 23 additions & 52 deletions packages/nodes-base/nodes/Postgres/v1/PostgresV1.node.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type {
ICredentialsDecrypted,
ICredentialTestFunctions,
IDataObject,
IExecuteFunctions,
INodeCredentialTestResult,
INodeExecutionData,
Expand All @@ -10,11 +9,12 @@ import type {
INodeTypeDescription,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import pgPromise from 'pg-promise';

import { oldVersionNotice } from '@utils/descriptions';

import { pgInsertV2, pgQueryV2, pgUpdate, wrapData } from './genericFunctions';
import { configurePostgres } from '../transport';
import type { PgpConnection, PostgresNodeCredentials } from '../v2/helpers/interfaces';

const versionDescription: INodeTypeDescription = {
displayName: 'Postgres',
Expand Down Expand Up @@ -298,33 +298,27 @@ export class PostgresV1 implements INodeType {
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as IDataObject;
try {
const pgp = pgPromise();
const config: IDataObject = {
host: credentials.host as string,
port: credentials.port as number,
database: credentials.database as string,
user: credentials.user as string,
password: credentials.password as string,
};
const credentials = credential.data as PostgresNodeCredentials;

if (credentials.allowUnauthorizedCerts === true) {
config.ssl = {
rejectUnauthorized: false,
};
} else {
config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
config.sslmode = (credentials.ssl as string) || 'disable';
}
let connection: PgpConnection | undefined;

const db = pgp(config);
await db.connect();
Copy link
Contributor Author

@despairblue despairblue Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would have left the connection hanging. It needs to be ended:

https://vitaly-t.github.io/pg-promise/Database.html#connect

The connection must be released in the end of the chain by calling done() on the connection object.

try {
const { db } = await configurePostgres.call(this, credentials, {});

// Acquires a new connection that can be used to to run multiple
// queries on the same connection and must be released again
// manually.
connection = await db.connect();
} catch (error) {
return {
status: 'Error',
message: error.message,
};
} finally {
if (connection) {
// release connection
await connection.done();
}
}
return {
status: 'OK',
Expand All @@ -335,42 +329,19 @@ export class PostgresV1 implements INodeType {
};

async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const credentials = await this.getCredentials('postgres');
const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres');
const largeNumbersOutput = this.getNodeParameter(
'additionalFields.largeNumbersOutput',
0,
'',
) as string;

const pgp = pgPromise();

if (largeNumbersOutput === 'numbers') {
pgp.pg.types.setTypeParser(20, (value: string) => {
return parseInt(value, 10);
});
pgp.pg.types.setTypeParser(1700, (value: string) => {
return parseFloat(value);
});
}

const config: IDataObject = {
host: credentials.host as string,
port: credentials.port as number,
database: credentials.database as string,
user: credentials.user as string,
password: credentials.password as string,
};

if (credentials.allowUnauthorizedCerts === true) {
config.ssl = {
rejectUnauthorized: false,
};
} else {
config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
config.sslmode = (credentials.ssl as string) || 'disable';
}

const db = pgp(config);
const { db, pgp } = await configurePostgres.call(this, credentials, {
largeNumbersOutput:
largeNumbersOutput === 'numbers' || largeNumbersOutput === 'text'
? largeNumbersOutput
: undefined,
});

let returnItems: INodeExecutionData[] = [];

Expand Down
2 changes: 1 addition & 1 deletion packages/nodes-base/nodes/Postgres/v2/actions/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { NodeExecutionOutput, NodeOperationError } from 'n8n-workflow';

import * as database from './database/Database.resource';
import type { PostgresType } from './node.type';
import { configurePostgres } from '../../transport';
import type { PostgresNodeCredentials, PostgresNodeOptions } from '../helpers/interfaces';
import { configureQueryRunner } from '../helpers/utils';
import { configurePostgres } from '../transport';

export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let returnData: INodeExecutionData[] = [];
Expand Down
2 changes: 2 additions & 0 deletions packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export type EnumInfo = {
export type PgpClient = pgPromise.IMain<{}, pg.IClient>;
export type PgpDatabase = pgPromise.IDatabase<{}, pg.IClient>;
export type PgpConnectionParameters = pg.IConnectionParameters<pg.IClient>;
export type PgpConnection = pgPromise.IConnected<{}, pg.IClient>;
export type ConnectionsData = { db: PgpDatabase; pgp: PgpClient };

export type QueriesRunner = (
Expand Down Expand Up @@ -57,6 +58,7 @@ export type PostgresNodeCredentials = {
database: string;
user: string;
password: string;
maxConnections: number;
allowUnauthorizedCerts?: boolean;
ssl?: 'disable' | 'allow' | 'require' | 'verify' | 'verify-full';
} & (
Expand Down
16 changes: 7 additions & 9 deletions packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,21 @@ import type {
INodeCredentialTestResult,
} from 'n8n-workflow';

import type { PgpClient, PostgresNodeCredentials } from '../helpers/interfaces';
import { configurePostgres } from '../transport';
import { configurePostgres } from '../../transport';
import type { PgpConnection, PostgresNodeCredentials } from '../helpers/interfaces';

export async function postgresConnectionTest(
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as PostgresNodeCredentials;

let pgpClientCreated: PgpClient | undefined;
let connection: PgpConnection | undefined;

try {
const { db, pgp } = await configurePostgres.call(this, credentials, {});
const { db } = await configurePostgres.call(this, credentials, {});

pgpClientCreated = pgp;

await db.connect();
connection = await db.connect();
} catch (error) {
let message = error.message as string;

Expand All @@ -41,8 +39,8 @@ export async function postgresConnectionTest(
message,
};
} finally {
if (pgpClientCreated) {
pgpClientCreated.end();
Copy link
Contributor Author

@despairblue despairblue Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would shut down all pools in the process:
https://vitaly-t.github.io/pg-promise/module-pg-promise.html#~end

Shuts down all connection pools created in the process, so it can terminate without delay. It is available as pgp.end, after initializing the library.

Doing this would render any pool for any credential unusable until restarting n8n or waiting for the pool to be destroyed by the connection pool manager and every execution would throw this:
Connection pool of the database object has been destroyed.

if (connection) {
await connection.done();
}
}
return {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow';

import { configurePostgres } from '../../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces';
import { configurePostgres } from '../transport';

export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow';

import { configurePostgres } from '../../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces';
import { getTableSchema } from '../helpers/utils';
import { configurePostgres } from '../transport';

export async function getColumns(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { ILoadOptionsFunctions, ResourceMapperFields, FieldType } from 'n8n-workflow';

import { configurePostgres } from '../../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces';
import { getEnumValues, getEnums, getTableSchema, uniqueColumns } from '../helpers/utils';
import { configurePostgres } from '../transport';

const fieldTypeMapping: Partial<Record<FieldType, string[]>> = {
string: ['text', 'varchar', 'character varying', 'character', 'char'],
Expand Down
Loading