diff --git a/py/core/base/providers/embedding.py b/py/core/base/providers/embedding.py index c08bd4720..1d7b5557a 100644 --- a/py/core/base/providers/embedding.py +++ b/py/core/base/providers/embedding.py @@ -25,10 +25,10 @@ class EmbeddingConfig(ProviderConfig): batch_size: int = 1 prefixes: Optional[dict[str, str]] = None add_title_as_prefix: bool = True - concurrent_request_limit: int = 16 - max_retries: int = 2 - initial_backoff: float = 1.0 - max_backoff: float = 60.0 + concurrent_request_limit: int = 256 + max_retries: int = 8 + initial_backoff: float = 1 + max_backoff: float = 64.0 def validate_config(self) -> None: if self.provider not in self.supported_providers: @@ -63,6 +63,7 @@ async def _execute_with_backoff_async(self, task: dict[str, Any]): try: async with self.semaphore: return await self._execute_task(task) + # TODO: Capture different error types and handle them accordingly except Exception as e: logger.warning( f"Request failed (attempt {retries + 1}): {str(e)}" diff --git a/py/core/base/providers/llm.py b/py/core/base/providers/llm.py index 65445bc53..9a35fe77f 100644 --- a/py/core/base/providers/llm.py +++ b/py/core/base/providers/llm.py @@ -20,9 +20,9 @@ class CompletionConfig(ProviderConfig): provider: Optional[str] = None generation_config: GenerationConfig = GenerationConfig() concurrent_request_limit: int = 256 - max_retries: int = 2 + max_retries: int = 8 initial_backoff: float = 1.0 - max_backoff: float = 60.0 + max_backoff: float = 64.0 def validate_config(self) -> None: if not self.provider: diff --git a/py/core/configs/r2r_aws_bedrock.toml b/py/core/configs/r2r_aws_bedrock.toml index d83fadbc2..6cb08693f 100644 --- a/py/core/configs/r2r_aws_bedrock.toml +++ b/py/core/configs/r2r_aws_bedrock.toml @@ -18,7 +18,7 @@ overlap = 20 [completion] provider = "litellm" -concurrent_request_limit = 16 +concurrent_request_limit = 256 [completion.generation_config] model = "bedrock/anthropic.claude-v2" diff --git a/py/core/pipes/kg/entity_description.py b/py/core/pipes/kg/entity_description.py index adc2b48e3..12de049b3 100644 --- a/py/core/pipes/kg/entity_description.py +++ b/py/core/pipes/kg/entity_description.py @@ -134,7 +134,7 @@ async def process_entity( ), } ], - generation_config=self.kg_provider.config.kg_enrichment_settings.generation_config, + generation_config=self.kg_provider.config.kg_creation_settings.generation_config, ) ) .choices[0] diff --git a/py/core/providers/database/relational.py b/py/core/providers/database/relational.py index 3d097b48e..aa26e995c 100644 --- a/py/core/providers/database/relational.py +++ b/py/core/providers/database/relational.py @@ -1,6 +1,6 @@ import logging from contextlib import asynccontextmanager - +import asyncio import asyncpg from core.base import RelationalDBProvider @@ -35,6 +35,9 @@ def __init__( self.project_name = project_name self.pool = None self.postgres_configuration_settings = postgres_configuration_settings + self.semaphore = asyncio.Semaphore( + int(self.postgres_configuration_settings.max_connections * 0.9) + ) async def initialize(self): try: @@ -42,6 +45,7 @@ async def initialize(self): self.connection_string, max_size=self.postgres_configuration_settings.max_connections, ) + logger.info( "Successfully connected to Postgres database and created connection pool." ) @@ -57,8 +61,9 @@ def _get_table_name(self, base_name: str) -> str: @asynccontextmanager async def get_connection(self): - async with self.pool.acquire() as conn: - yield conn + async with self.semaphore: + async with self.pool.acquire() as conn: + yield conn async def execute_query(self, query, params=None): async with self.get_connection() as conn: diff --git a/py/core/providers/kg/postgres.py b/py/core/providers/kg/postgres.py index 9cefe4452..9d5607c91 100644 --- a/py/core/providers/kg/postgres.py +++ b/py/core/providers/kg/postgres.py @@ -40,6 +40,7 @@ def __init__( self.db_provider = db_provider.relational self.embedding_provider = embedding_provider + try: import networkx as nx @@ -160,9 +161,6 @@ async def create_tables(self, project_name: str): await self.execute_query(query) - # TODO: Create another table for entity_embedding_collection - # entity embeddings at a collection level - # communities table, result of the Leiden algorithm query = f""" CREATE TABLE IF NOT EXISTS {self._get_table_name("community")} ( diff --git a/py/core/providers/orchestration/hatchet.py b/py/core/providers/orchestration/hatchet.py index d641d4705..01f6670fd 100644 --- a/py/core/providers/orchestration/hatchet.py +++ b/py/core/providers/orchestration/hatchet.py @@ -35,6 +35,7 @@ def get_worker(self, name: str, max_threads: Optional[int] = None) -> Any: self.worker = self.orchestrator.worker(name, max_threads) return self.worker + def concurrency(self, *args, **kwargs) -> Callable: return self.orchestrator.concurrency(*args, **kwargs) diff --git a/py/r2r.toml b/py/r2r.toml index 10e98fc4b..313d4901e 100644 --- a/py/r2r.toml +++ b/py/r2r.toml @@ -18,7 +18,7 @@ default_admin_password = "change_me_immediately" [completion] provider = "litellm" -concurrent_request_limit = 16 +concurrent_request_limit = 256 [completion.generation_config] model = "openai/gpt-4o"