Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added text summarization to pgRag #12

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/examples/book.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Call me Ishmael. Some years ago—never mind how long precisely—having
little or no money in my purse, and nothing particular to interest me
on shore, I thought I would sail about a little and see the watery part
of the world. It is a way I have of driving off the spleen and
regulating the circulation. Whenever I find myself growing grim about
the mouth; whenever it is a damp, drizzly November in my soul; whenever
I find myself involuntarily pausing before coffin warehouses, and
bringing up the rear of every funeral I meet; and especially whenever
my hypos get such an upper hand of me, that it requires a strong moral
principle to prevent me from deliberately stepping into the street, and
methodically knocking people’s hats off—then, I account it high time to
get to sea as soon as I can. This is my substitute for pistol and ball.
With a philosophical flourish Cato throws himself upon his sword; I
quietly take to the ship. There is nothing surprising in this. If they
but knew it, almost all men in their degree, some time or other,
cherish very nearly the same feelings towards the ocean with me.
13 changes: 7 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
},
"dependencies": {
"@langchain/community": "^0.0.53",
"@langchain/core": "^0.1.62",
"@langchain/openai": "^0.0.28",
"@nearform/sql": "^1.10.5",
"@types/pdf-parse": "^1.1.4",
Expand Down
42 changes: 35 additions & 7 deletions src/db/documents.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,56 @@

import pg from 'pg'
import SQL from '@nearform/sql'


interface Document {
id?: number
name: string
content: string
summary: string
raw_content: string
metadata: object
}

export async function saveDocument(connPool:pg.Pool, doc:Document):Promise<{id:number}> {
export async function saveDocument(
connPool: pg.Pool,
doc: Document
): Promise<{ id: number }> {
const client = await connPool.connect()
const res = await client.query(SQL`INSERT INTO documents (name, content, raw_content, metadata)
VALUES (${doc.name}, ${doc.content}, ${doc.raw_content}, ${doc.metadata})
const res =
await client.query(SQL`INSERT INTO documents (name, content, summary, raw_content, metadata)
VALUES (${doc.name}, ${doc.content}, ${doc.summary}, ${doc.raw_content}, ${doc.metadata})
RETURNING id`)
client.release()
return res.rows[0]
}

export async function getDocument(connPool:pg.Pool, doc:{id: number}):Promise<Document|undefined> {
export async function updateDocument(
connPool: pg.Pool,
doc: Document
): Promise<{ id: number }> {
const client = await connPool.connect()
const res = await client.query(
SQL`UPDATE documents
SET name = ${doc.name},
content = ${doc.content},
summary = ${doc.summary},
raw_content = ${doc.raw_content},
metadata = ${doc.metadata}
WHERE id = ${doc.id}
RETURNING id`
)

client.release()
return res.rows[0]
}

export async function getDocument(
connPool: pg.Pool,
doc: { id: number }
): Promise<Document | undefined> {
const client = await connPool.connect()
const res = await client.query(SQL`SELECT * FROM documents WHERE id = ${doc.id}`)
const res = await client.query(
SQL`SELECT * FROM documents WHERE id = ${doc.id}`
)
client.release()
return res.rows ? res.rows[0] : undefined
}
1 change: 1 addition & 0 deletions src/db/migrations/001.do.migrate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ CREATE TABLE documents (
name TEXT NOT NULL,
raw_content bytea,
content TEXT,
summary TEXT,
metadata JSONB
);

Expand Down
54 changes: 34 additions & 20 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import pg from 'pg'
import { Embeddings } from "@langchain/core/embeddings"
import { Embeddings } from '@langchain/core/embeddings'
import officeparser from 'officeparser'
import { LLM } from 'langchain/llms/base'
import { migrate } from './db/migrations/migrate.js'
import { pino } from 'pino'
import * as db from './db/documents.js'
import { init as initJobQueue } from './jobs/index.js'
import { getVectorStore, insertVectorColumn } from './db/vector/index.js'
import { fileTypeFromBuffer, FileTypeResult } from 'file-type';

const logger = pino({ name: 'pg-rag' })

const logger = pino({name: 'pg-rag'})

interface PgRagOptions {
dbPool: pg.Pool,
interface PgRagOptions<T> {
dbPool: pg.Pool
embeddings: Embeddings
chatModel?: T
resetDB?: boolean // Resets the DB on inititalization
}

Expand All @@ -26,51 +27,64 @@ interface RagArgs {
prompt: string
}

function isOfficeFileType(fileType: FileTypeResult|undefined) {
return fileType && ['docx', 'pptx', 'xlsx', 'odt', 'odp', 'ods', 'pdf'].includes(fileType.ext.toLowerCase())
function isOfficeFileType(fileType: FileTypeResult | undefined) {
return (
fileType &&
['docx', 'pptx', 'xlsx', 'odt', 'odp', 'ods', 'pdf'].includes(
fileType.ext.toLowerCase()
)
)
}


export async function init(options:PgRagOptions) {
export async function init<T extends LLM>(options: PgRagOptions<T>) {
logger.info('Initializing')

if(options.resetDB) {
if (options.resetDB) {
await migrate(options.dbPool, '0')
}
await migrate(options.dbPool, '1')
await insertVectorColumn(options.dbPool, options.embeddings)

const jobQueue = await initJobQueue(options.dbPool, options.embeddings)
const jobQueue = await initJobQueue(
options.dbPool,
options.embeddings,
options.chatModel
)
const vectorStore = getVectorStore(options.dbPool, options.embeddings)

const saveDocument = async (args: SaveArgs):Promise<string|null> => {
const saveDocument = async (args: SaveArgs): Promise<string | null> => {
try {
logger.debug('Parsing document')
const fileType = await fileTypeFromBuffer(args.data)
let content:string|null = null
if(isOfficeFileType(fileType)) {
let content: string | null = null
if (isOfficeFileType(fileType)) {
content = await officeparser.parseOfficeAsync(args.data)
} else if (fileType) {
throw new Error(`Unsupported file of mime type "${fileType.mime}" with extension "${fileType.ext}". Check the documentation for what types of files are supported by this library.`)
throw new Error(
`Unsupported file of mime type "${fileType.mime}" with extension "${fileType.ext}". Check the documentation for what types of files are supported by this library.`
)
} else {
content = args.data.toString('utf8')
}
logger.debug('Document parsed')

const doc = await db.saveDocument(options.dbPool, {
name: args.name,
raw_content: args.data.toString('base64'),
content: content,
summary: '',
metadata: {}
})
return await jobQueue.processDocument({documentId: doc.id})
} catch(err) {

return await jobQueue.processDocument({ documentId: doc.id })
} catch (err) {
logger.error(err)
throw err
}
}

const search = async(args: RagArgs ) => {
return await vectorStore.similaritySearch(args.prompt, 1);
const search = async (args: RagArgs) => {
return await vectorStore.similaritySearch(args.prompt, 1)
}

const shutdown = async () => {
Expand All @@ -85,4 +99,4 @@ export async function init(options:PgRagOptions) {
pgBoss: jobQueue.pgBoss,
shutdown
}
}
}
99 changes: 68 additions & 31 deletions src/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import PGBoss from 'pg-boss'
import pg from 'pg'
const logger = pino({ name: 'pg-boss'})
const logger = pino({ name: 'pg-boss' })
import { pino } from 'pino'
import * as processDocs from './job_process_document.js'
import { Embeddings } from "@langchain/core/embeddings"
import { Embeddings } from '@langchain/core/embeddings'
import { LLM } from 'langchain/llms/base'

interface Job {
id: string,
state: 'created' | 'archive' | 'active' | 'retry' | 'expired' | 'failed' | 'cancelled' | 'archive' | 'completed'
id: string
state:
| 'created'
| 'archive'
| 'active'
| 'retry'
| 'expired'
| 'failed'
| 'cancelled'
| 'archive'
| 'completed'
}

function getJobEndedBooleanStatus(job:Job) {
function getJobEndedBooleanStatus(job: Job) {
if (['expired', 'failed', 'cancelled'].includes(job.state)) {
return false
} else if (['archive', 'completed'].includes(job.state)) {
Expand All @@ -20,27 +30,45 @@ function getJobEndedBooleanStatus(job:Job) {
}
}

function jobCheck(pgBoss:PGBoss, jobId:string, resolve:(success:boolean)=>void, reject: (err:Error) => void, nextDelay=500) {
pgBoss.getJobById(jobId).then(job => {
if(!job) {
return reject(new Error('Could not find job with this id'))
}
function jobCheck(
pgBoss: PGBoss,
jobId: string,
resolve: (success: boolean) => void,
reject: (err: Error) => void,
nextDelay = 500
) {
pgBoss
.getJobById(jobId)
.then(job => {
if (!job) {
return reject(new Error('Could not find job with this id'))
}

const jobSuccessful = getJobEndedBooleanStatus(job)
if (jobSuccessful === false) {
resolve(false)
} else if (jobSuccessful === true) {
resolve(true)
} else {
setTimeout(() => {
jobCheck(pgBoss, jobId, resolve, reject, Math.min(nextDelay + nextDelay/2, 5000))
}, nextDelay)
}
}).catch(reject)
const jobSuccessful = getJobEndedBooleanStatus(job)
if (jobSuccessful === false) {
resolve(false)
} else if (jobSuccessful === true) {
resolve(true)
} else {
setTimeout(() => {
jobCheck(
pgBoss,
jobId,
resolve,
reject,
Math.min(nextDelay + nextDelay / 2, 5000)
)
}, nextDelay)
}
})
.catch(reject)
}

export async function init(pool:pg.Pool, embeddings: Embeddings, ) {

export async function init<T extends LLM>(
pool: pg.Pool,
embeddings: Embeddings,
chatModel?: T
) {
const pgBoss = new PGBoss({
db: {
executeSql: async (text, values) => {
Expand All @@ -52,21 +80,30 @@ export async function init(pool:pg.Pool, embeddings: Embeddings, ) {
}
})

await pgBoss.start();
logger.info('Started');
await pgBoss.start()
logger.info('Started')

const queueJob = async<T extends object>(queueName:string, jobData:T):Promise<string|null> => {
logger.info({msg: 'Queuing job', queueName, jobData})
const queueJob = async <T extends object>(
queueName: string,
jobData: T
): Promise<string | null> => {
logger.info({ msg: 'Queuing job', queueName, jobData })
return await pgBoss.send(queueName, jobData)
}

pgBoss.work<processDocs.JobData>(processDocs.QUEUE_NAME, processDocs.createJobProcessor({pool, embeddings, pgBoss}))
const processDocument = async (doc:processDocs.JobData) => {
const jobId = await queueJob<processDocs.JobData>(processDocs.QUEUE_NAME, doc)
pgBoss.work<processDocs.JobData>(
processDocs.QUEUE_NAME,
processDocs.createJobProcessor<T>({ pool, embeddings, pgBoss, chatModel })
)
const processDocument = async (doc: processDocs.JobData) => {
const jobId = await queueJob<processDocs.JobData>(
processDocs.QUEUE_NAME,
doc
)
return jobId
}

const waitForDocumentProcessed = (jobId:string):Promise<boolean> => {
const waitForDocumentProcessed = (jobId: string): Promise<boolean> => {
return new Promise<boolean>((resolve, reject) => {
jobCheck(pgBoss, jobId, resolve, reject)
})
Expand Down
Loading
Loading