-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPostgres.js
98 lines (79 loc) · 2.25 KB
/
Postgres.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import {execSync} from 'node:child_process';
import pg from 'pg';
import fs from 'node:fs';
import QueryStream from 'pg-query-stream';
/*
* Aggregate functions are returned as BigInts which by default are converted to strings
* This changes floats and ints to their respective types
*/
pg.types.setTypeParser(1700, parseFloat)
pg.types.setTypeParser(20, parseInt);
const {TEST_DB_DATA_PATH, TEST_DB_SCHEMA_PATH} = process.env;
const schemaSql = fs.readFileSync(TEST_DB_SCHEMA_PATH);
const insertDataSql = fs.readFileSync(TEST_DB_DATA_PATH);
export default class Postgres {
constructor(credentials) {
this.credentials = credentials;
}
async init() {
// We have to connect to the docker instance to run in the mysql dump via a query
execSync(
`docker exec -i --env PGPASSWORD=${this.credentials.password} dare_db psql -U postgres -h ${this.credentials.host} postgres`,
{
input: `CREATE DATABASE ${this.credentials.database}; \\c ${this.credentials.database}; ${schemaSql}`,
}
);
const client = new pg.Client({
...this.credentials,
user: 'postgres',
});
await client.connect();
// Initiate the connection
this.conn = client;
// Extract the tables
const tables = await this.conn.query(`
SELECT table_name AS table
FROM information_schema.tables
WHERE table_catalog = '${this.credentials.database}' AND table_schema = 'public'
`);
this.tables = tables.rows.map(({table}) => table);
return this.conn;
}
async query(request) {
// Execute the query
const {command, rowCount, rows} = await this.conn.query(
request.text,
request?.values
);
// Return SELECT rows
if (command === 'SELECT') {
return rows;
}
// Else, return a node-mysql'ish response
return {
insertId: rows?.[0]?.id,
affectedRows: rowCount,
};
}
async resetDbState() {
const resetDataSql = `
TRUNCATE TABLE ${this.tables.join()};\n
${insertDataSql}
`;
await this.conn.query(resetDataSql);
}
stream(request, streamOptions = {objectMode: true, highWaterMark: 5}) {
// Stream query results from the DB
const queryStream = new QueryStream(
request.text,
request?.values,
streamOptions
);
// @ts-ignore
this.conn.query(queryStream);
return queryStream;
}
end() {
return this.conn.end();
}
}