Skip to content

Commit

Permalink
Add hatchet logging (#1391)
Browse files Browse the repository at this point in the history
* Add KG tests (#1351)

* cli tests

* add sdk tests

* typo fix

* change workflow ordering

* add collection integration tests (#1352)

* bump pkg

* remove workflows

* fix sdk test port

* fix delete collection return check

* Fix document info serialization (#1353)

* Update integration-test-workflow-debian.yml

* pre-commit

* slightly modify

* up

* up

* smaller file

* up

* typo, change order

* up

* up

* change order

---------

Co-authored-by: emrgnt-cmplxty <68796651+emrgnt-cmplxty@users.noreply.github.com>
Co-authored-by: emrgnt-cmplxty <owen@algofi.org>
Co-authored-by: Nolan Tremelling <34580718+NolanTrem@users.noreply.github.com>

* add graphrag docs (#1362)

* add documentation

* up

* Update js/sdk/src/models.tsx

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* pre-commit

---------

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* Concurrent index creation, allow -1 for paginated entries (#1363)

* update webdev-template for current next.js and r2r-js sdk (#1218)

Co-authored-by: Simeon <simeon@theobald.nz>

* Feature/extend integration tests rebased (#1361)

* cleanups

* add back overzealous edits

* extend workflows

* fix full setup

* simplify cli

* add ymls

* rename to light

* try again

* start light

* add cli tests

* fix

* fix

* testing..

* trying complete matrix testflow

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* up

* up

* up

* All actions

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* try offic pgvec formula

* sudo make

* sudo make

* push and pray

* push and pray

* add new actions

* add new actions

* docker push & pray

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* setup docker

* setup docker

* fix default

* fix default

* Feature/rebase to r2r vars (#1364)

* cleanups

* add back overzealous edits

* extend workflows

* fix full setup

* simplify cli

* add ymls

* rename to light

* try again

* start light

* add cli tests

* fix

* fix

* testing..

* trying complete matrix testflow

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* up

* up

* up

* All actions

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* try offic pgvec formula

* sudo make

* sudo make

* push and pray

* push and pray

* add new actions

* add new actions

* docker push & pray

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* setup docker

* setup docker

* fix default

* fix default

* make changes

* update the windows workflow

* update the windows workflow

* remove extra workflows for now

* bump pkg

* push and pray

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive tests

* revive tests

* revive tests

* revive tests

* update tests

* fix typos (#1366)

* update tests

* up

* up

* up

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* Add ingestion concurrency limit (#1367)

* up

* up

* up

---------

Co-authored-by: --global=Shreyas Pimpalgaonkar <--global=shreyas.gp.7@gmail.com>

* tweaks and fixes

* Fix Ollama Tool Calling (#1372)

* Update graphrag.mdx

* Fix Ollama tool calling

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>

* Clean up Docker Compose (#1368)

* Fix hatchet, dockerfile

* Update compose

* point to correct docker image

* Fix bug in deletion, better validation error handling (#1374)

* Update graphrag.mdx

* Fix bug in deletion, better validation error handling

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>

* vec index creation endpoint (#1373)

* Update graphrag.mdx

* upload files

* create vector index endpoint

* add to fastapi background task

* pre-commit

* move logging

* add api spec, support for all vecs

* pre-commit

* add workflow

* Modify KG Endpoints and update API spec (#1369)

* Update graphrag.mdx

* modify API endpoints and update documentation

* Update ingestion_router.py

* try different docker setup (#1371)

* try different docker setup

* action

* add login

* add full

* update action

* cleanup upload script

* cleanup upload script

* tweak action

* tweak action

* tweak action

* tweak action

* tweak action

* tweak action

* Nolan/ingest chunks js (#1375)

* Update graphrag.mdx

* Clean up ingest chunks, add to JS SDK

* Update JS docs

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>

* up (#1376)

* Bump JS package (#1378)

* Fix Create Graph (#1379)

* up

* up

* modify assertion

* up

* up

* increase entity limit

* changing aristotle back to v2

* pre-commit

* typos

* add test_ingest_sample_file_2_sdk

* Update server.py

* add docs and refine code

* add python SDK documentation

* up

* add logs

* clean

* rm vq

* rm conflicts

* pre-commit

* up

* add logging

* update logs

* up

* up

* Update kg_service.py

---------

Co-authored-by: emrgnt-cmplxty <68796651+emrgnt-cmplxty@users.noreply.github.com>
Co-authored-by: emrgnt-cmplxty <owen@algofi.org>
Co-authored-by: Nolan Tremelling <34580718+NolanTrem@users.noreply.github.com>
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
Co-authored-by: FutureProofTechOps <operations@theobald.nz>
Co-authored-by: Simeon <simeon@theobald.nz>
Co-authored-by: --global=Shreyas Pimpalgaonkar <--global=shreyas.gp.7@gmail.com>
  • Loading branch information
8 people authored Oct 14, 2024
1 parent c93f7f3 commit db3acfd
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 42 deletions.
4 changes: 0 additions & 4 deletions py/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,4 @@ COPY r2r.toml /app/r2r.toml
COPY pyproject.toml /app/pyproject.toml

# Run the application
<<<<<<< HEAD
CMD ["sh", "-c", "uvicorn core.main.app_entry:app --host $HOST --port $R2R_PORT"]
=======
CMD ["sh", "-c", "uvicorn core.main.app_entry:app --host $R2R_HOST --port $R2R_PORT"]
>>>>>>> 8ae04c5bfdbeab77073b6ae1169c5bff1b32489b
1 change: 1 addition & 0 deletions py/core/base/providers/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
VectorSearchResult,
default_embedding_prefixes,
)

from .base import Provider, ProviderConfig

logger = logging.getLogger(__name__)
Expand Down
1 change: 1 addition & 0 deletions py/core/base/providers/kg.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ async def get_entity_count(
self,
collection_id: Optional[UUID] = None,
document_id: Optional[UUID] = None,
distinct: bool = False,
entity_table_name: str = "entity_embedding",
) -> int:
"""Abstract method to get the entity count."""
Expand Down
1 change: 1 addition & 0 deletions py/core/main/api/ingestion_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pydantic import Json

from core.base import R2RException, RawChunk, generate_document_id

from core.base.api.models import (
CreateVectorIndexResponse,
WrappedCreateVectorIndexResponse,
Expand Down
1 change: 0 additions & 1 deletion py/core/main/api/kg_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from core.utils import generate_default_user_collection_id
from shared.abstractions.kg import KGRunType
from shared.utils.base_utils import update_settings_from_dict

from ..services.kg_service import KgService
from .base_router import BaseRouter

Expand Down
67 changes: 57 additions & 10 deletions py/core/main/orchestration/hatchet/kg_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import logging
import math
import uuid
import time

from hatchet_sdk import ConcurrencyLimitStrategy, Context

from core import GenerationConfig
from core.base import OrchestrationProvider

from shared.abstractions.document import KGExtractionStatus
from ...services import KgService

from shared.utils import create_hatchet_logger

logger = logging.getLogger(__name__)
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -57,9 +60,7 @@ def concurrency(self, context: Context) -> str:
@orchestration_provider.step(retries=1, timeout="360m")
async def kg_extract(self, context: Context) -> dict:

context.log(
f"Running KG Extraction for input: {context.workflow_input()['request']}"
)
start_time = time.time()

input_data = get_input_data_dict(
context.workflow_input()["request"]
Expand All @@ -70,12 +71,16 @@ async def kg_extract(self, context: Context) -> dict:

await self.kg_service.kg_triples_extraction(
document_id=uuid.UUID(document_id),
logger=context.log,
logger=create_hatchet_logger(context.log),
**input_data["kg_creation_settings"],
)

context.log(
f"Successfully ran kg triples extraction for document {document_id}"
)

return {
"result": f"successfully ran kg triples extraction for document {document_id}"
"result": f"successfully ran kg triples extraction for document {document_id} in {time.time() - start_time:.2f} seconds",
}

@orchestration_provider.step(
Expand All @@ -90,13 +95,44 @@ async def kg_entity_description(self, context: Context) -> dict:

await self.kg_service.kg_entity_description(
document_id=uuid.UUID(document_id),
logger=create_hatchet_logger(context.log),
**input_data["kg_creation_settings"],
)

context.log(
f"Successfully ran kg node description for document {document_id}"
)

return {
"result": f"successfully ran kg node description for document {document_id}"
}

@orchestration_provider.failure()
async def on_failure(self, context: Context) -> None:
request = context.workflow_input().get("request", {})
document_id = request.get("document_id")

if not document_id:
context.log(
"No document id was found in workflow input to mark a failure."
)
return

try:
await self.kg_service.providers.database.relational.set_workflow_status(
id=uuid.UUID(document_id),
status_type="kg_extraction_status",
status=KGExtractionStatus.FAILED,
)
context.log(
f"Updated KG extraction status for {document_id} to FAILED"
)

except Exception as e:
context.log(
f"Failed to update document status for {document_id}: {e}"
)

@orchestration_provider.workflow(name="create-graph", timeout="360m")
class CreateGraphWorkflow:
def __init__(self, kg_service: KgService):
Expand Down Expand Up @@ -187,6 +223,8 @@ def __init__(self, kg_service: KgService):
@orchestration_provider.step(retries=1, parents=[], timeout="360m")
async def kg_clustering(self, context: Context) -> dict:

start_time = time.time()

logger.info("Running KG Clustering")
input_data = get_input_data_dict(
context.workflow_input()["request"]
Expand All @@ -195,11 +233,12 @@ async def kg_clustering(self, context: Context) -> dict:

kg_clustering_results = await self.kg_service.kg_clustering(
collection_id=collection_id,
logger=create_hatchet_logger(context.log),
**input_data["kg_enrichment_settings"],
)

context.log(
f"Successfully ran kg clustering for collection {collection_id}: {json.dumps(kg_clustering_results)}"
f"Successfully ran kg clustering for collection {collection_id}: {json.dumps(kg_clustering_results)} in {time.time() - start_time:.2f} seconds"
)
logger.info(
f"Successfully ran kg clustering for collection {collection_id}: {json.dumps(kg_clustering_results)}"
Expand All @@ -220,10 +259,14 @@ async def kg_community_summary(self, context: Context) -> dict:
num_communities = context.step_output("kg_clustering")[
"kg_clustering"
][0]["num_communities"]

parallel_communities = min(100, num_communities)
total_workflows = math.ceil(num_communities / parallel_communities)
workflows = []

context.log(
f"Running KG Community Summary for {num_communities} communities, spawning {total_workflows} workflows"
)

for i in range(total_workflows):
offset = i * parallel_communities
workflows.append(
Expand Down Expand Up @@ -257,15 +300,19 @@ def __init__(self, kg_service: KgService):

@orchestration_provider.step(retries=1, timeout="360m")
async def kg_community_summary(self, context: Context) -> dict:

start_time = time.time()

input_data = get_input_data_dict(
context.workflow_input()["request"]
)

community_summary = await self.kg_service.kg_community_summary(
**input_data
logger=create_hatchet_logger(context.log),
**input_data,
)
context.log(
f"Successfully ran kg community summary for communities {input_data['offset']} to {input_data['offset'] + len(community_summary)}"
f"Successfully ran kg community summary for communities {input_data['offset']} to {input_data['offset'] + len(community_summary)} in {time.time() - start_time:.2f} seconds "
)
return {
"result": f"successfully ran kg community summary for communities {input_data['offset']} to {input_data['offset'] + len(community_summary)}"
Expand Down
25 changes: 16 additions & 9 deletions py/core/main/orchestration/simple/kg_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,22 @@ async def create_graph(input_data):

for _, document_id in enumerate(document_ids):
# Extract triples from the document
await service.kg_triples_extraction(
document_id=document_id,
**input_data["kg_creation_settings"],
)
# Describe the entities in the graph
await service.kg_entity_description(
document_id=document_id,
**input_data["kg_creation_settings"],
)

try:
await service.kg_triples_extraction(
document_id=document_id,
**input_data["kg_creation_settings"],
)
# Describe the entities in the graph
await service.kg_entity_description(
document_id=document_id,
**input_data["kg_creation_settings"],
)

except Exception as e:
logger.error(
f"Error in creating graph for document {document_id}: {e}"
)

async def enrich_graph(input_data):

Expand Down
Loading

0 comments on commit db3acfd

Please sign in to comment.