Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest Space feedback #50

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .env.default
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
RABBITMQ_HOST=localhost
RABBITMQ_USER=alekmio-admin
RABBITMQ_USER=alkemio-admin
RABBITMQ_PASSWORD=alkemio!
RABBITMQ_PORT=5672
RABBITMQ_QUEUE=virtual-contributor-ingest-space
RABBITMQ_INGEST_SPACE_QUEUE=virtual-contributor-ingest-space
RABBITMQ_INGEST_SPACE_RESULT_QUEUE=virtual-contributor-ingest-space-result
RABBITMQ_EXCHANGE=event-bus

API_ENDPOINT_PRIVATE_GRAPHQL='http://localhost:3000/api/private/non-interactive/graphql'
AUTH_ORY_KRATOS_PUBLIC_BASE_URL=http://localhost:3000/ory/kratos/public
AUTH_ADMIN_EMAIL=admin@alkem.io
AUTH_ADMIN_PASSWORD=changeMe
LOG_LEVEL=debug
AZURE_OPENAI_ENDPOINT=https://alkemio-gpt.openai.azure.com
AZURE_OPENAI_API_KEY=<azure-open-ai-key>
Expand All @@ -19,7 +19,7 @@ AUTH_ADMIN_EMAIL=master-admin@alkem.io
AUTH_ADMIN_PASSWORD=master-password

VECTOR_DB_HOST=localhost
VECTOR_DB_PORT=8000
VECTOR_DB_PORT=8765

CHUNK_SIZE=1000
CHUNK_OVERLAP=100
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@alkemio/space-ingest",
"version": "0.8.2",
"version": "0.9.0",
"description": "",
"author": "Alkemio Foundation",
"private": true,
Expand Down
2 changes: 1 addition & 1 deletion src/callout.handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Callout, CalloutType } from '../generated/graphql';
import { Document } from 'langchain/document';
import { baseHandler } from './base';
import { linkCollectionHandler } from './link.collection';
import { AlkemioCliClient } from 'src/graphql-client/AlkemioCliClient';
import { AlkemioCliClient } from 'src/graphql.client/AlkemioCliClient';

const handlersMap: Record<
CalloutType,
Expand Down
2 changes: 1 addition & 1 deletion src/callout.handlers/link.collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf';
import { DocxLoader } from '@langchain/community/document_loaders/fs/docx';
import { MimeTypeDocumentMap } from '../document.type';
import { SpreadSheetLoader, DocLoader } from '../loaders';
import { AlkemioCliClient } from 'src/graphql-client/AlkemioCliClient';
import { AlkemioCliClient } from 'src/graphql.client/AlkemioCliClient';

const downloadDocument = async (
uri: string,
Expand Down
10 changes: 3 additions & 7 deletions src/ingest.ts → src/embed.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SpaceIngestionPurpose } from './space.ingestion.purpose';
import { SpaceIngestionPurpose } from './event.bus/events/ingest.space';
import { Document } from 'langchain/document';
import { RecursiveCharacterTextSplitter } from 'langchain/text_splitter';
import { OpenAIClient, AzureKeyCredential, EmbeddingItem } from '@azure/openai';
Expand Down Expand Up @@ -152,12 +152,8 @@ export default async (
`Batch ${i} of size ${embeddingsBatches[i].length} added to collection ${name}`
);
} catch (error) {
logger.error({
...(error as Error),
error: 'Error adding to collection. Halting...',
metadata: JSON.stringify(metadataBatches[i]),
});
return false;
logger.error(error);
throw error;
}
}
return true;
Expand Down
142 changes: 142 additions & 0 deletions src/event.bus/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import amqlib, { Connection as AmqlibConnection, Channel } from 'amqplib';
import logger from '../logger';
import { IngestSpace } from './events/ingest.space';
import { IngestSpaceResult } from './events/ingest.space.result';

type ConsumeCallback = (event: IngestSpace) => void | Promise<void>;

type ConnectionConfig = {
host: string;
user: string;
password: string;
port: string;
incomingQueue: string;
outgoingQueue: string;
exchange: string;
};

export class Connection {
connection!: AmqlibConnection;
channel!: Channel;
private connected!: boolean;
private config!: ConnectionConfig;

static #instance: Connection;

static async get() {
if (!this.#instance) {
this.#instance = new Connection();
await this.#instance.connect();
}

return this.#instance;
}

private getEnvValue(key: string): string {
const value = process.env[key];
if (!value) {
throw new Error(`${key} is empty in environment.`);
}

return value;
}

private loadConfigFromEnv() {
this.config = {
host: this.getEnvValue('RABBITMQ_HOST'),
user: this.getEnvValue('RABBITMQ_USER'),
password: this.getEnvValue('RABBITMQ_PASSWORD'),
port: this.getEnvValue('RABBITMQ_PORT'),
incomingQueue: this.getEnvValue('RABBITMQ_INGEST_SPACE_QUEUE'),
outgoingQueue: this.getEnvValue('RABBITMQ_INGEST_SPACE_RESULT_QUEUE'),
exchange: this.getEnvValue('RABBITMQ_EXCHANGE'),
};
}

async connect() {
if (this.connected && this.channel) {
return;
}

try {
this.loadConfigFromEnv();

logger.info('Connecting to RabbitMQ Server');

const connectionString = `amqp://${this.config.user}:${this.config.password}@${this.config.host}:${this.config.port}`;

this.connection = await amqlib.connect(connectionString);

logger.info('Rabbit MQ Connection is ready.');

this.channel = await this.connection.createChannel();

// important! handle message in a sequemce instead of paralell; for some reason
// _spamming_ the queue with messages results in all sorts of random exceptions;
//
// being able to bomb the queue with messages is important for a collection name migration
// we need to do
this.channel.prefetch(1);

await this.channel.assertQueue(this.config.incomingQueue, {
durable: true,
});
await this.channel.assertQueue(this.config.outgoingQueue, {
durable: true,
});
await this.channel.assertExchange(this.config.exchange, 'direct');

// just one outgoing event for now; if we introduce more we can rework this to dinamically
// bind event to queues
await this.channel.bindQueue(
this.config.outgoingQueue,
this.config.exchange,
'IngestSpaceResult'
);

logger.info('RabbitMQ initialised successfully');
this.connected = true;
} catch (error) {
logger.error(error);
logger.error('Not connected to MQ Server');
}
}

async send(message: IngestSpaceResult) {
try {
if (!this.channel) {
await this.connect();
}

this.channel.sendToQueue(
this.config.outgoingQueue,
Buffer.from(JSON.stringify(message))
);
} catch (error) {
logger.error(error);
throw error;
}
}

async consume(handler: ConsumeCallback) {
this.channel.consume(
this.config.incomingQueue,
msg => {
{
if (!msg) {
return logger.error('Invalid incoming message');
}
const { spaceId, purpose, personaServiceId } = JSON.parse(
JSON.parse(msg.content.toString())
);
const event = new IngestSpace(spaceId, purpose, personaServiceId);

handler(event);
}
},
{
noAck: true,
}
);
}
}
26 changes: 26 additions & 0 deletions src/event.bus/events/ingest.space.result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { SpaceIngestionPurpose } from './ingest.space';

export enum SpaceIngestionResult {
SUCCESS = 'success',
FAILURE = 'failure',
}

export enum ErrorCode {
VECTOR_INSERT = 'vector_insert',
}

type IngestError = {
code?: ErrorCode;
message: string;
};

export class IngestSpaceResult {
constructor(
public readonly spaceId: string,
public readonly purpose: SpaceIngestionPurpose,
public readonly personaServiceId: string,
public readonly timestamp: number,
public result: SpaceIngestionResult = SpaceIngestionResult.SUCCESS,
public error?: IngestError
) {}
}
12 changes: 12 additions & 0 deletions src/event.bus/events/ingest.space.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export enum SpaceIngestionPurpose {
KNOWLEDGE = 'knowledge',
CONTEXT = 'context',
}

export class IngestSpace {
constructor(
public readonly spaceId: string,
public readonly purpose: SpaceIngestionPurpose,
public readonly personaServiceId: string
) {}
}
Loading