Skip to content

Commit

Permalink
Merge pull request #39 from alkem-io/batch-process-documents
Browse files Browse the repository at this point in the history
Batch process documents
  • Loading branch information
valentinyanakiev authored Jul 3, 2024
2 parents 99de749 + 8d8f055 commit f1aed1e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 26 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@alkemio/space-ingest",
"version": "0.6.1",
"version": "0.7.0",
"description": "",
"author": "Alkemio Foundation",
"private": true,
Expand Down
73 changes: 50 additions & 23 deletions src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import { dbConnect } from './db.connect';
import { Metadata } from 'chromadb';
import { DocumentType } from './document.type';

const batch = (arr: any[], size: number) =>
Array.from({ length: Math.ceil(arr.length / size) }, (v, i) =>
arr.slice(i * size, i * size + size)
);

export default async (
spaceNameID: string,
docs: Document[],
Expand Down Expand Up @@ -38,7 +43,6 @@ export default async (
const doc = docs[docIndex];

let splitted;
console.log(doc.metadata.type);
// do not split spreadhseets to prevent data loss
if (doc.metadata.type === DocumentType.SPREADSHEET) {
splitted = [doc];
Expand Down Expand Up @@ -67,15 +71,30 @@ export default async (

logger.info('Generating embeddings...');
const openAi = new OpenAIClient(endpoint, new AzureKeyCredential(key));

let data: EmbeddingItem[] = [];
try {
const response = await openAi.getEmbeddings(depolyment, documents);
data = response.data;
} catch (e) {
logger.error('Embeeddings error.', e);
return false;

logger.info(`Total number of chunks: ${documents.length}`);
const docBatches = batch(documents, 20);
const metadataBatches = batch(metadatas, 20);
const idsBatches = batch(ids, 20);

for (let i = 0; i < docBatches.length; i++) {
try {
const batch = docBatches[i];
const response = await openAi.getEmbeddings(depolyment, batch);
data = [...data, ...response.data];
logger.info(
`Embedding generated for batch ${i}; Batch size is: ${batch.length}`
);
} catch (e) {
logger.error('Embeeddings error.', e);
logger.error(`Metadatas for batch are: ${metadataBatches[i]}`);
}
}

logger.info('Embedding generated');
logger.info(`Total number of generated embeddings: ${data.length}`);

try {
logger.info(`Deleting old collection: ${name}`);
Expand All @@ -84,23 +103,31 @@ export default async (
logger.info(`Collection '${name}' doesn't exist.`);
}

try {
logger.info(`Creating collection: ${name}`);
const collection = await client.getOrCreateCollection({
name,
metadata: { createdAt: new Date().getTime() },
});
const embeddingsBatches = batch(data, 20);

logger.info(`Adding to collection collection: ${name}`);
await collection.upsert({
ids,
documents,
metadatas,
embeddings: data.map(({ embedding }) => embedding),
});
logger.info(`Added to collection collection: ${name}`);
} catch (e) {
logger.error(`Error adding to collection: ${name}`, e);
for (let i = 0; i < embeddingsBatches.length; i++) {
try {
logger.info(`Creating collection: ${name}`);
const collection = await client.getOrCreateCollection({
name,
metadata: { createdAt: new Date().getTime() },
});

logger.info(`Adding to collection collection: ${name}`);
await collection.upsert({
ids: idsBatches[i],
documents: docBatches[i],
metadatas: metadataBatches[i],
embeddings: embeddingsBatches[i].map(({ embedding }) => embedding),
});
logger.info(
`Batch ${i} of size ${embeddingsBatches[i].length} added to collection ${name}`
);
} catch (e) {
logger.error(`Error adding to collection: ${name}`, e);
logger.error(`Metadatas for batch are: ${metadataBatches[i]}`);
return false;
}
}
return true;
};

0 comments on commit f1aed1e

Please sign in to comment.