From 3e73d059b456bd44b757d070d301f1076459141d Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Thu, 3 Oct 2024 13:05:44 -0700 Subject: [PATCH 01/19] try --- py/shared/abstractions/base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/py/shared/abstractions/base.py b/py/shared/abstractions/base.py index 80a350f29..bcc840b5e 100644 --- a/py/shared/abstractions/base.py +++ b/py/shared/abstractions/base.py @@ -52,6 +52,10 @@ class Config: json_encoders = { bytes: lambda v: v.decode("utf-8", errors="ignore"), } + + +class UUID(R2RSerializable, UUID): + pass class AsyncSyncMeta(type): From 519281b15a939dbc57e1de22a29e5bcb0858d297 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Thu, 3 Oct 2024 14:17:31 -0700 Subject: [PATCH 02/19] up --- py/core/main/orchestration/hatchet/kg_workflow.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/py/core/main/orchestration/hatchet/kg_workflow.py b/py/core/main/orchestration/hatchet/kg_workflow.py index 2e8ee0dd8..7c68f2e0f 100644 --- a/py/core/main/orchestration/hatchet/kg_workflow.py +++ b/py/core/main/orchestration/hatchet/kg_workflow.py @@ -4,7 +4,7 @@ import math import uuid -from hatchet_sdk import Context +from hatchet_sdk import Context, ConcurrencyLimitStrategy from core import GenerationConfig from core.base import OrchestrationProvider @@ -40,6 +40,10 @@ def get_input_data_dict(input_data): class KGExtractDescribeEmbedWorkflow: def __init__(self, kg_service: KgService): self.kg_service = kg_service + + @orchestration_provider.concurrency(max_runs=10, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN) + def concurrency(self, context) -> str: + return str(context.workflow_input()["request"]["collection_id"]) @orchestration_provider.step(retries=1, timeout="360m") async def kg_extract(self, context: Context) -> dict: @@ -144,6 +148,9 @@ async def kg_extraction_ingress(self, context: Context) -> dict: ][ "kg_creation_settings" ], + "collection_id": context.workflow_input()[ + "request" + ]["collection_id"], } }, key=f"kg-extract-{cnt}/{len(document_ids)}", From 84c960e8779633d640d8a78db866a62c856a0b59 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Thu, 3 Oct 2024 15:50:07 -0700 Subject: [PATCH 03/19] up --- py/core/providers/orchestration/hatchet.py | 3 +++ py/shared/abstractions/base.py | 5 ----- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/py/core/providers/orchestration/hatchet.py b/py/core/providers/orchestration/hatchet.py index 41599c39f..0d8381ad9 100644 --- a/py/core/providers/orchestration/hatchet.py +++ b/py/core/providers/orchestration/hatchet.py @@ -34,6 +34,9 @@ def get_worker(self, name: str, max_threads: Optional[int] = None) -> Any: max_threads = self.config.max_threads self.worker = self.orchestrator.worker(name, max_threads) return self.worker + + def concurrency(self, *args, **kwargs) -> Callable: + return self.orchestrator.concurrency(*args, **kwargs) async def start_worker(self): if not self.worker: diff --git a/py/shared/abstractions/base.py b/py/shared/abstractions/base.py index bcc840b5e..716fb4fda 100644 --- a/py/shared/abstractions/base.py +++ b/py/shared/abstractions/base.py @@ -52,11 +52,6 @@ class Config: json_encoders = { bytes: lambda v: v.decode("utf-8", errors="ignore"), } - - -class UUID(R2RSerializable, UUID): - pass - class AsyncSyncMeta(type): _event_loop = None # Class-level shared event loop From 8f2cbecb58e446e4f49552bddbb8978899c25467 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Thu, 3 Oct 2024 15:51:10 -0700 Subject: [PATCH 04/19] space --- py/shared/abstractions/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/py/shared/abstractions/base.py b/py/shared/abstractions/base.py index 716fb4fda..80a350f29 100644 --- a/py/shared/abstractions/base.py +++ b/py/shared/abstractions/base.py @@ -53,6 +53,7 @@ class Config: bytes: lambda v: v.decode("utf-8", errors="ignore"), } + class AsyncSyncMeta(type): _event_loop = None # Class-level shared event loop From ac05dd8bbaee8d568b69593080b23b9f9172e448 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Thu, 3 Oct 2024 15:55:50 -0700 Subject: [PATCH 05/19] add it in ingestion --- py/core/main/orchestration/hatchet/ingestion_workflow.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/py/core/main/orchestration/hatchet/ingestion_workflow.py b/py/core/main/orchestration/hatchet/ingestion_workflow.py index 2e5cd27e1..3b4f82f71 100644 --- a/py/core/main/orchestration/hatchet/ingestion_workflow.py +++ b/py/core/main/orchestration/hatchet/ingestion_workflow.py @@ -2,7 +2,7 @@ import logging from typing import TYPE_CHECKING -from hatchet_sdk import Context +from hatchet_sdk import Context, ConcurrencyLimitStrategy from core.base import IngestionStatus, OrchestrationProvider, increment_version from core.base.abstractions import DocumentInfo, R2RException @@ -26,6 +26,10 @@ def hatchet_ingestion_factory( class HatchetIngestFilesWorkflow: def __init__(self, ingestion_service: IngestionService): self.ingestion_service = ingestion_service + + @orchestration_provider.concurrency(max_runs=256, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN) + def concurrency(self, context) -> str: + return str(context.workflow_input()["request"]["user"]["id"]) @orchestration_provider.step(timeout="60m") async def parse(self, context: Context) -> dict: From 0b1514242937e4a97da05c079a8327a86bfdf8eb Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Thu, 3 Oct 2024 16:26:14 -0700 Subject: [PATCH 06/19] rm ingestion --- py/core/main/api/kg_router.py | 4 ++-- py/core/main/orchestration/hatchet/ingestion_workflow.py | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/py/core/main/api/kg_router.py b/py/core/main/api/kg_router.py index 081ad3a90..378d3eb29 100644 --- a/py/core/main/api/kg_router.py +++ b/py/core/main/api/kg_router.py @@ -102,7 +102,7 @@ async def create_graph( ) workflow_input = { - "collection_id": collection_id, + "collection_id": str(collection_id), "kg_creation_settings": server_kg_creation_settings.model_dump_json(), "user": auth_user.json(), } @@ -157,7 +157,7 @@ async def enrich_graph( setattr(server_kg_enrichment_settings, key, value) workflow_input = { - "collection_id": collection_id, + "collection_id": str(collection_id), "kg_enrichment_settings": server_kg_enrichment_settings.model_dump_json(), "user": auth_user.json(), } diff --git a/py/core/main/orchestration/hatchet/ingestion_workflow.py b/py/core/main/orchestration/hatchet/ingestion_workflow.py index 3b4f82f71..2e5cd27e1 100644 --- a/py/core/main/orchestration/hatchet/ingestion_workflow.py +++ b/py/core/main/orchestration/hatchet/ingestion_workflow.py @@ -2,7 +2,7 @@ import logging from typing import TYPE_CHECKING -from hatchet_sdk import Context, ConcurrencyLimitStrategy +from hatchet_sdk import Context from core.base import IngestionStatus, OrchestrationProvider, increment_version from core.base.abstractions import DocumentInfo, R2RException @@ -26,10 +26,6 @@ def hatchet_ingestion_factory( class HatchetIngestFilesWorkflow: def __init__(self, ingestion_service: IngestionService): self.ingestion_service = ingestion_service - - @orchestration_provider.concurrency(max_runs=256, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN) - def concurrency(self, context) -> str: - return str(context.workflow_input()["request"]["user"]["id"]) @orchestration_provider.step(timeout="60m") async def parse(self, context: Context) -> dict: From 07df34a8750a5705c69f51222766a78af9dfc3da Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Thu, 3 Oct 2024 16:46:10 -0700 Subject: [PATCH 07/19] init --- py/core/base/providers/embedding.py | 9 +++++---- py/core/base/providers/llm.py | 4 ++-- py/core/configs/r2r_aws_bedrock.toml | 2 +- py/r2r.toml | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/py/core/base/providers/embedding.py b/py/core/base/providers/embedding.py index c08bd4720..1d7b5557a 100644 --- a/py/core/base/providers/embedding.py +++ b/py/core/base/providers/embedding.py @@ -25,10 +25,10 @@ class EmbeddingConfig(ProviderConfig): batch_size: int = 1 prefixes: Optional[dict[str, str]] = None add_title_as_prefix: bool = True - concurrent_request_limit: int = 16 - max_retries: int = 2 - initial_backoff: float = 1.0 - max_backoff: float = 60.0 + concurrent_request_limit: int = 256 + max_retries: int = 8 + initial_backoff: float = 1 + max_backoff: float = 64.0 def validate_config(self) -> None: if self.provider not in self.supported_providers: @@ -63,6 +63,7 @@ async def _execute_with_backoff_async(self, task: dict[str, Any]): try: async with self.semaphore: return await self._execute_task(task) + # TODO: Capture different error types and handle them accordingly except Exception as e: logger.warning( f"Request failed (attempt {retries + 1}): {str(e)}" diff --git a/py/core/base/providers/llm.py b/py/core/base/providers/llm.py index 65445bc53..9a35fe77f 100644 --- a/py/core/base/providers/llm.py +++ b/py/core/base/providers/llm.py @@ -20,9 +20,9 @@ class CompletionConfig(ProviderConfig): provider: Optional[str] = None generation_config: GenerationConfig = GenerationConfig() concurrent_request_limit: int = 256 - max_retries: int = 2 + max_retries: int = 8 initial_backoff: float = 1.0 - max_backoff: float = 60.0 + max_backoff: float = 64.0 def validate_config(self) -> None: if not self.provider: diff --git a/py/core/configs/r2r_aws_bedrock.toml b/py/core/configs/r2r_aws_bedrock.toml index e5fd2858c..c2a157af1 100644 --- a/py/core/configs/r2r_aws_bedrock.toml +++ b/py/core/configs/r2r_aws_bedrock.toml @@ -18,7 +18,7 @@ overlap = 20 [completion] provider = "litellm" -concurrent_request_limit = 16 +concurrent_request_limit = 256 [completion.generation_config] model = "bedrock/anthropic.claude-v2" diff --git a/py/r2r.toml b/py/r2r.toml index 8867a1b32..b47467dc9 100644 --- a/py/r2r.toml +++ b/py/r2r.toml @@ -14,7 +14,7 @@ default_admin_password = "change_me_immediately" [completion] provider = "litellm" -concurrent_request_limit = 16 +concurrent_request_limit = 256 [completion.generation_config] model = "openai/gpt-4o" From f47877810392154505c8e71fb71f78a419ac09a9 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Thu, 3 Oct 2024 17:35:59 -0700 Subject: [PATCH 08/19] add semaphore --- py/core/providers/kg/postgres.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/py/core/providers/kg/postgres.py b/py/core/providers/kg/postgres.py index d86b8ab40..c70c098b5 100644 --- a/py/core/providers/kg/postgres.py +++ b/py/core/providers/kg/postgres.py @@ -4,6 +4,7 @@ from uuid import UUID import asyncpg +import asyncio from graspologic.partition import HierarchicalClusters from core.base import ( @@ -44,6 +45,9 @@ def __init__( self.db_provider = db_provider.relational self.embedding_provider = embedding_provider + + self.semaphore = asyncio.Semaphore(512) + try: import networkx as nx @@ -62,7 +66,8 @@ async def initialize(self): async def execute_query( self, query: str, params: Optional[list[Any]] = None ) -> Any: - return await self.db_provider.execute_query(query, params) + async with self.semaphore: + return await self.db_provider.execute_query(query, params) async def execute_many( self, @@ -70,14 +75,16 @@ async def execute_many( params: Optional[list[tuple[Any]]] = None, batch_size: int = 1000, ) -> Any: - return await self.db_provider.execute_many(query, params, batch_size) + async with self.semaphore: + return await self.db_provider.execute_many(query, params, batch_size) async def fetch_query( self, query: str, params: Optional[Any] = None, # TODO: make this strongly typed ) -> Any: - return await self.db_provider.fetch_query(query, params) + async with self.semaphore: + return await self.db_provider.fetch_query(query, params) def _get_table_name(self, base_name: str) -> str: return self.db_provider._get_table_name(base_name) From ce48ca1e5b85ff90bf36e86be1704c8573d1265d Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 11:39:38 -0700 Subject: [PATCH 09/19] test --- py/core/providers/database/relational.py | 9 ++++++--- py/core/providers/kg/postgres.py | 1 + py/r2r.toml | 8 ++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/py/core/providers/database/relational.py b/py/core/providers/database/relational.py index 3d097b48e..a83d32fcd 100644 --- a/py/core/providers/database/relational.py +++ b/py/core/providers/database/relational.py @@ -1,6 +1,6 @@ import logging from contextlib import asynccontextmanager - +import asyncio import asyncpg from core.base import RelationalDBProvider @@ -35,6 +35,7 @@ def __init__( self.project_name = project_name self.pool = None self.postgres_configuration_settings = postgres_configuration_settings + self.semaphore = asyncio.Semaphore(100) async def initialize(self): try: @@ -42,6 +43,7 @@ async def initialize(self): self.connection_string, max_size=self.postgres_configuration_settings.max_connections, ) + logger.info( "Successfully connected to Postgres database and created connection pool." ) @@ -57,8 +59,9 @@ def _get_table_name(self, base_name: str) -> str: @asynccontextmanager async def get_connection(self): - async with self.pool.acquire() as conn: - yield conn + async with self.semaphore: + async with self.pool.acquire() as conn: + yield conn async def execute_query(self, query, params=None): async with self.get_connection() as conn: diff --git a/py/core/providers/kg/postgres.py b/py/core/providers/kg/postgres.py index eaf613906..adc42d88d 100644 --- a/py/core/providers/kg/postgres.py +++ b/py/core/providers/kg/postgres.py @@ -4,6 +4,7 @@ from uuid import UUID import asyncpg +import asyncio from core.base import ( CommunityReport, diff --git a/py/r2r.toml b/py/r2r.toml index 313d4901e..4f1278a2f 100644 --- a/py/r2r.toml +++ b/py/r2r.toml @@ -21,7 +21,7 @@ provider = "litellm" concurrent_request_limit = 256 [completion.generation_config] - model = "openai/gpt-4o" + model = "azure/gpt-4o" temperature = 0.1 top_p = 1 max_tokens_to_sample = 1_024 @@ -66,17 +66,17 @@ batch_size = 256 fragment_merge_count = 4 # number of fragments to merge into a single extraction max_knowledge_triples = 100 max_description_input_length = 1024 - generation_config = { model = "openai/gpt-4o-mini" } # and other params, model used for triplet extraction + generation_config = { model = "azure/gpt-4o-mini" } # and other params, model used for triplet extraction [kg.kg_enrichment_settings] community_reports_prompt = "graphrag_community_reports_prompt" - generation_config = { model = "openai/gpt-4o-mini" } # and other params, model used for node description and graph clustering + generation_config = { model = "azure/gpt-4o-mini" } # and other params, model used for node description and graph clustering leiden_params = {} [kg.kg_search_settings] map_system_prompt = "graphrag_map_system_prompt" reduce_system_prompt = "graphrag_reduce_system_prompt" - generation_config = { model = "openai/gpt-4o-mini" } + generation_config = { model = "azure/gpt-4o-mini" } [logging] provider = "local" From 772a77e6bdd0da06fd70f9613e46bba7964bb474 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 14:10:06 -0700 Subject: [PATCH 10/19] rm duplicates --- py/core/main/orchestration/hatchet/kg_workflow.py | 4 ---- py/r2r.toml | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/py/core/main/orchestration/hatchet/kg_workflow.py b/py/core/main/orchestration/hatchet/kg_workflow.py index 81141c18e..89666ee67 100644 --- a/py/core/main/orchestration/hatchet/kg_workflow.py +++ b/py/core/main/orchestration/hatchet/kg_workflow.py @@ -41,10 +41,6 @@ class KGExtractDescribeEmbedWorkflow: def __init__(self, kg_service: KgService): self.kg_service = kg_service - @orchestration_provider.concurrency(max_runs=10, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN) - def concurrency(self, context) -> str: - return str(context.workflow_input()["request"]["collection_id"]) - @orchestration_provider.concurrency( max_runs=orchestration_provider.config.kg_creation_concurrency_limit, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, diff --git a/py/r2r.toml b/py/r2r.toml index 4f1278a2f..651c025b5 100644 --- a/py/r2r.toml +++ b/py/r2r.toml @@ -66,7 +66,7 @@ batch_size = 256 fragment_merge_count = 4 # number of fragments to merge into a single extraction max_knowledge_triples = 100 max_description_input_length = 1024 - generation_config = { model = "azure/gpt-4o-mini" } # and other params, model used for triplet extraction + generation_config = { model = "openai/gpt-4o-mini" } # and other params, model used for triplet extraction [kg.kg_enrichment_settings] community_reports_prompt = "graphrag_community_reports_prompt" From 41532f40bb9a3fbf4a80eba5cc312842480189de Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 14:10:46 -0700 Subject: [PATCH 11/19] kg_creation_settings --- py/core/pipes/kg/entity_description.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/core/pipes/kg/entity_description.py b/py/core/pipes/kg/entity_description.py index adc2b48e3..12de049b3 100644 --- a/py/core/pipes/kg/entity_description.py +++ b/py/core/pipes/kg/entity_description.py @@ -134,7 +134,7 @@ async def process_entity( ), } ], - generation_config=self.kg_provider.config.kg_enrichment_settings.generation_config, + generation_config=self.kg_provider.config.kg_creation_settings.generation_config, ) ) .choices[0] From a2c407ecaf9e89feab7b596a4bcd5ae720fa57bb Mon Sep 17 00:00:00 2001 From: --global=Shreyas Pimpalgaonkar <--global=shreyas.gp.7@gmail.com> Date: Fri, 4 Oct 2024 21:17:05 +0000 Subject: [PATCH 12/19] rm semaphores --- py/core/providers/kg/postgres.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/py/core/providers/kg/postgres.py b/py/core/providers/kg/postgres.py index adc42d88d..978037cb1 100644 --- a/py/core/providers/kg/postgres.py +++ b/py/core/providers/kg/postgres.py @@ -45,9 +45,7 @@ def __init__( self.db_provider = db_provider.relational self.embedding_provider = embedding_provider - - self.semaphore = asyncio.Semaphore(512) - + try: import networkx as nx @@ -66,8 +64,7 @@ async def initialize(self): async def execute_query( self, query: str, params: Optional[list[Any]] = None ) -> Any: - async with self.semaphore: - return await self.db_provider.execute_query(query, params) + return await self.db_provider.execute_query(query, params) async def execute_many( self, @@ -75,16 +72,14 @@ async def execute_many( params: Optional[list[tuple[Any]]] = None, batch_size: int = 1000, ) -> Any: - async with self.semaphore: - return await self.db_provider.execute_many(query, params, batch_size) + return await self.db_provider.execute_many(query, params, batch_size) async def fetch_query( self, query: str, params: Optional[Any] = None, # TODO: make this strongly typed ) -> Any: - async with self.semaphore: - return await self.db_provider.fetch_query(query, params) + return await self.db_provider.fetch_query(query, params) def _get_table_name(self, base_name: str) -> str: return self.db_provider._get_table_name(base_name) From 8bcc99c869a9fcf0ae6d0fbc599b1e5f384b0eaf Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 17:43:54 -0700 Subject: [PATCH 13/19] increase conns --- py/core/providers/database/relational.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/py/core/providers/database/relational.py b/py/core/providers/database/relational.py index a83d32fcd..aa26e995c 100644 --- a/py/core/providers/database/relational.py +++ b/py/core/providers/database/relational.py @@ -35,7 +35,9 @@ def __init__( self.project_name = project_name self.pool = None self.postgres_configuration_settings = postgres_configuration_settings - self.semaphore = asyncio.Semaphore(100) + self.semaphore = asyncio.Semaphore( + int(self.postgres_configuration_settings.max_connections * 0.9) + ) async def initialize(self): try: From a4a7a208a52c54b55aed56e00a474417ed207c5a Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 17:45:18 -0700 Subject: [PATCH 14/19] change it back --- py/r2r.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/py/r2r.toml b/py/r2r.toml index 651c025b5..e7cffcfb7 100644 --- a/py/r2r.toml +++ b/py/r2r.toml @@ -70,13 +70,13 @@ batch_size = 256 [kg.kg_enrichment_settings] community_reports_prompt = "graphrag_community_reports_prompt" - generation_config = { model = "azure/gpt-4o-mini" } # and other params, model used for node description and graph clustering + generation_config = { model = "openai/gpt-4o-mini" } # and other params, model used for node description and graph clustering leiden_params = {} [kg.kg_search_settings] map_system_prompt = "graphrag_map_system_prompt" reduce_system_prompt = "graphrag_reduce_system_prompt" - generation_config = { model = "azure/gpt-4o-mini" } + generation_config = { model = "openai/gpt-4o-mini" } [logging] provider = "local" From 5231ea44ff37af242265b736a150669f2f600c23 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 17:51:09 -0700 Subject: [PATCH 15/19] clean --- py/core/providers/kg/postgres.py | 26 ++++++++++------------ py/core/providers/orchestration/hatchet.py | 4 +--- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/py/core/providers/kg/postgres.py b/py/core/providers/kg/postgres.py index 978037cb1..9b6bae59c 100644 --- a/py/core/providers/kg/postgres.py +++ b/py/core/providers/kg/postgres.py @@ -4,7 +4,6 @@ from uuid import UUID import asyncpg -import asyncio from core.base import ( CommunityReport, @@ -45,7 +44,7 @@ def __init__( self.db_provider = db_provider.relational self.embedding_provider = embedding_provider - + try: import networkx as nx @@ -152,6 +151,7 @@ async def create_tables(self, project_name: str): await self.execute_query(query) # embeddings tables + # TODO: deprecating document ID query = f""" CREATE TABLE IF NOT EXISTS {self._get_table_name("entity_embedding")} ( id SERIAL PRIMARY KEY, @@ -159,16 +159,14 @@ async def create_tables(self, project_name: str): description TEXT NOT NULL, extraction_ids UUID[] NOT NULL, description_embedding vector({self.embedding_provider.config.base_dimension}) NOT NULL, - document_id UUID NOT NULL, - UNIQUE (name, document_id) + document_ids UUID[] NOT NULL, + collection_id UUID NOT NULL, + UNIQUE (name, collection_id) ); """ await self.execute_query(query) - # TODO: Create another table for entity_embedding_collection - # entity embeddings at a collection level - # communities table, result of the Leiden algorithm query = f""" CREATE TABLE IF NOT EXISTS {self._get_table_name("community")} ( @@ -324,33 +322,33 @@ async def add_kg_extractions( return (total_entities, total_relationships) async def get_entity_map( - self, offset: int, limit: int, document_id: UUID + self, offset: int, limit: int, document_ids: list[UUID] ) -> Dict[str, Dict[str, List[Dict[str, Any]]]]: QUERY1 = f""" WITH entities_list AS ( SELECT DISTINCT name FROM {self._get_table_name("entity_raw")} - WHERE document_id = $1 + WHERE document_id = ANY($1) ORDER BY name ASC LIMIT {limit} OFFSET {offset} ) SELECT e.name, e.description, e.category, (SELECT array_agg(DISTINCT x) FROM unnest(e.extraction_ids) x) AS extraction_ids, - e.document_id + (SELECT array_agg(DISTINCT x) FROM unnest(e.document_ids) x) AS document_ids FROM {self._get_table_name("entity_raw")} e JOIN entities_list el ON e.name = el.name - GROUP BY e.name, e.description, e.category, e.extraction_ids, e.document_id + GROUP BY e.name, e.description, e.category, e.extraction_ids, e.entity_id, e.document_ids ORDER BY e.name;""" - entities_list = await self.fetch_query(QUERY1, [document_id]) + entities_list = await self.fetch_query(QUERY1, [document_ids]) entities_list = [ Entity( name=entity["name"], description=entity["description"], category=entity["category"], extraction_ids=entity["extraction_ids"], - document_id=entity["document_id"], + document_ids=entity["document_ids"], ) for entity in entities_list ] @@ -372,7 +370,7 @@ async def get_entity_map( ORDER BY t.subject, t.predicate, t.object; """ - triples_list = await self.fetch_query(QUERY2, [document_id]) + triples_list = await self.fetch_query(QUERY2, [document_ids]) triples_list = [ Triple( subject=triple["subject"], diff --git a/py/core/providers/orchestration/hatchet.py b/py/core/providers/orchestration/hatchet.py index 7014dfe62..01f6670fd 100644 --- a/py/core/providers/orchestration/hatchet.py +++ b/py/core/providers/orchestration/hatchet.py @@ -34,9 +34,7 @@ def get_worker(self, name: str, max_threads: Optional[int] = None) -> Any: max_threads = self.config.max_threads self.worker = self.orchestrator.worker(name, max_threads) return self.worker - - def concurrency(self, *args, **kwargs) -> Callable: - return self.orchestrator.concurrency(*args, **kwargs) + def concurrency(self, *args, **kwargs) -> Callable: return self.orchestrator.concurrency(*args, **kwargs) From 0cc46716fa246c80a2700c994e0481d43424b808 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 17:51:46 -0700 Subject: [PATCH 16/19] up --- py/r2r.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/r2r.toml b/py/r2r.toml index e7cffcfb7..313d4901e 100644 --- a/py/r2r.toml +++ b/py/r2r.toml @@ -21,7 +21,7 @@ provider = "litellm" concurrent_request_limit = 256 [completion.generation_config] - model = "azure/gpt-4o" + model = "openai/gpt-4o" temperature = 0.1 top_p = 1 max_tokens_to_sample = 1_024 From df6bd316abd07e1f0d3e8a019bdb2e0bcb50a5aa Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 17:53:00 -0700 Subject: [PATCH 17/19] up --- py/core/providers/kg/postgres.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/py/core/providers/kg/postgres.py b/py/core/providers/kg/postgres.py index 9b6bae59c..4221ccab0 100644 --- a/py/core/providers/kg/postgres.py +++ b/py/core/providers/kg/postgres.py @@ -151,7 +151,6 @@ async def create_tables(self, project_name: str): await self.execute_query(query) # embeddings tables - # TODO: deprecating document ID query = f""" CREATE TABLE IF NOT EXISTS {self._get_table_name("entity_embedding")} ( id SERIAL PRIMARY KEY, @@ -159,9 +158,8 @@ async def create_tables(self, project_name: str): description TEXT NOT NULL, extraction_ids UUID[] NOT NULL, description_embedding vector({self.embedding_provider.config.base_dimension}) NOT NULL, - document_ids UUID[] NOT NULL, - collection_id UUID NOT NULL, - UNIQUE (name, collection_id) + document_id UUID NOT NULL, + UNIQUE (name, document_id) ); """ From a6d96305deca6c7112d93319eca50e951b687226 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 18:06:16 -0700 Subject: [PATCH 18/19] up --- py/core/providers/kg/postgres.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/py/core/providers/kg/postgres.py b/py/core/providers/kg/postgres.py index 4221ccab0..5225834d5 100644 --- a/py/core/providers/kg/postgres.py +++ b/py/core/providers/kg/postgres.py @@ -320,33 +320,33 @@ async def add_kg_extractions( return (total_entities, total_relationships) async def get_entity_map( - self, offset: int, limit: int, document_ids: list[UUID] + self, offset: int, limit: int, document_id: UUID ) -> Dict[str, Dict[str, List[Dict[str, Any]]]]: QUERY1 = f""" WITH entities_list AS ( SELECT DISTINCT name FROM {self._get_table_name("entity_raw")} - WHERE document_id = ANY($1) + WHERE document_id = $1 ORDER BY name ASC LIMIT {limit} OFFSET {offset} ) SELECT e.name, e.description, e.category, (SELECT array_agg(DISTINCT x) FROM unnest(e.extraction_ids) x) AS extraction_ids, - (SELECT array_agg(DISTINCT x) FROM unnest(e.document_ids) x) AS document_ids + e.document_id FROM {self._get_table_name("entity_raw")} e JOIN entities_list el ON e.name = el.name - GROUP BY e.name, e.description, e.category, e.extraction_ids, e.entity_id, e.document_ids + GROUP BY e.name, e.description, e.category, e.extraction_ids, e.entity_id, e.document_id ORDER BY e.name;""" - entities_list = await self.fetch_query(QUERY1, [document_ids]) + entities_list = await self.fetch_query(QUERY1, [document_id]) entities_list = [ Entity( name=entity["name"], description=entity["description"], category=entity["category"], extraction_ids=entity["extraction_ids"], - document_ids=entity["document_ids"], + document_id=entity["document_id"], ) for entity in entities_list ] @@ -368,7 +368,7 @@ async def get_entity_map( ORDER BY t.subject, t.predicate, t.object; """ - triples_list = await self.fetch_query(QUERY2, [document_ids]) + triples_list = await self.fetch_query(QUERY2, [document_id]) triples_list = [ Triple( subject=triple["subject"], From e0175fe914b019c67a829efea46a042f6eedd484 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 4 Oct 2024 18:07:45 -0700 Subject: [PATCH 19/19] up --- py/core/providers/kg/postgres.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/py/core/providers/kg/postgres.py b/py/core/providers/kg/postgres.py index 5225834d5..4167a7945 100644 --- a/py/core/providers/kg/postgres.py +++ b/py/core/providers/kg/postgres.py @@ -158,7 +158,7 @@ async def create_tables(self, project_name: str): description TEXT NOT NULL, extraction_ids UUID[] NOT NULL, description_embedding vector({self.embedding_provider.config.base_dimension}) NOT NULL, - document_id UUID NOT NULL, + document_id UUID NOT NULL, UNIQUE (name, document_id) ); """ @@ -336,7 +336,7 @@ async def get_entity_map( e.document_id FROM {self._get_table_name("entity_raw")} e JOIN entities_list el ON e.name = el.name - GROUP BY e.name, e.description, e.category, e.extraction_ids, e.entity_id, e.document_id + GROUP BY e.name, e.description, e.category, e.extraction_ids, e.document_id ORDER BY e.name;""" entities_list = await self.fetch_query(QUERY1, [document_id])