-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming and Agent Routing using langgraph #213
Conversation
Warning Rate limit exceeded@dhirenmathur has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 25 minutes and 54 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
WalkthroughThe pull request introduces significant enhancements to the agent infrastructure, focusing on asynchronous streaming capabilities and improved state management across multiple agent classes. Key modifications include adding Changes
Sequence DiagramsequenceDiagram
participant User
participant ConversationService
participant AgentSupervisor
participant AgentFactory
participant SpecificAgent
User->>ConversationService: Send Query
ConversationService->>AgentSupervisor: Process Query
AgentSupervisor->>AgentFactory: Get Appropriate Agent
AgentFactory-->>AgentSupervisor: Return Agent Instance
AgentSupervisor->>SpecificAgent: Execute Query
SpecificAgent-->>AgentSupervisor: Stream Results
AgentSupervisor-->>ConversationService: Yield Streaming Results
ConversationService-->>User: Stream Response
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
app/modules/intelligence/agents/chat_agents/qna_chat_agent.py (2)
101-112
: Add inline documentation for the streaming approach.The new “_stream_rag_agent” method is a great addition for real-time chunk streaming. Consider adding a docstring or comment explaining the expected chunk format and how the writer is intended to handle partial responses, especially if external tools consume the streamed data.
113-122
: Potential to document graph creation.Your
_create_graph
method is concise and leverages StateGraph effectively. Adding a brief docstring describing the node/edge flow (i.e., from “START” to “rag_agent” to “END”) could help future maintainers understand the graph logic at a glance.app/modules/intelligence/agents/agents/rag_agent.py (2)
13-13
: Remove unused import FileCallbackHandler if not utilized.The callback handler import is not used anywhere in this file. Removing it helps streamline the code.
-from app.modules.intelligence.agents.agents.callback_handler import FileCallbackHandler
🧰 Tools
🪛 Ruff (0.8.2)
13-13:
app.modules.intelligence.agents.agents.callback_handler.FileCallbackHandler
imported but unusedRemove unused import:
app.modules.intelligence.agents.agents.callback_handler.FileCallbackHandler
(F401)
289-290
: Use a single 'with' statement for multiple contexts.You can merge these nested context managers for cleaner flow.
- async def kickoff(): - with os.fdopen(write_fd, "w", buffering=1) as write_file: - with redirect_stdout(write_file): - await rag_agent.run( + async def kickoff(): + with os.fdopen(write_fd, "w", buffering=1) as write_file, redirect_stdout(write_file): + await rag_agent.run(🧰 Tools
🪛 Ruff (0.8.2)
289-290: Use a single
with
statement with multiple contexts instead of nestedwith
statements(SIM117)
app/modules/intelligence/agents/agents/callback_handler.py (1)
1-4
: Remove unused imports.Static analysis suggests that “os”, “json”, and “typing.Optional” are not used in this file. Removing them will eliminate clutter.
-import os -from datetime import datetime -import json -from typing import Any, Dict, List, Optional, Tuple, Union +from datetime import datetime +from typing import Any, Dict, List, Tuple, Union🧰 Tools
🪛 Ruff (0.8.2)
1-1:
os
imported but unusedRemove unused import:
os
(F401)
3-3:
json
imported but unusedRemove unused import:
json
(F401)
4-4:
typing.Optional
imported but unusedRemove unused import:
typing.Optional
(F401)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
app/modules/intelligence/agents/agents/callback_handler.py
(1 hunks)app/modules/intelligence/agents/agents/rag_agent.py
(4 hunks)app/modules/intelligence/agents/chat_agents/qna_chat_agent.py
(4 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
app/modules/intelligence/agents/agents/callback_handler.py
1-1: os
imported but unused
Remove unused import: os
(F401)
3-3: json
imported but unused
Remove unused import: json
(F401)
4-4: typing.Optional
imported but unused
Remove unused import: typing.Optional
(F401)
app/modules/intelligence/agents/agents/rag_agent.py
13-13: app.modules.intelligence.agents.agents.callback_handler.FileCallbackHandler
imported but unused
Remove unused import: app.modules.intelligence.agents.agents.callback_handler.FileCallbackHandler
(F401)
289-290: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
app/modules/intelligence/agents/chat_agents/qna_chat_agent.py
5-5: typing.Any
imported but unused
Remove unused import
(F401)
5-5: typing.Optional
imported but unused
Remove unused import
(F401)
6-6: typing.Annotated
imported but unused
Remove unused import: typing.Annotated
(F401)
12-12: langgraph.graph.message.add_messages
imported but unused
Remove unused import: langgraph.graph.message.add_messages
(F401)
🔇 Additional comments (4)
app/modules/intelligence/agents/chat_agents/qna_chat_agent.py (3)
92-98
: Good use of TypedDict for state clarity.
This class-based TypedDict properly documents the expected state fields, ensuring clarity and type safety. It’s a beneficial approach for strongly typed states.
123-139
: Nice asynchronous layering in 'run'.
The method seamlessly compiles the graph and yields streamed chunks. Invoking graph.astream
with “stream_mode='custom'” is a nice touch. Just ensure that all upstream consumers of this generator properly handle partial responses and any potential exceptions.
Line range hint 176-205
: Validate chunk usage in the RAG agent section.
Within this block, data from “kickoff_rag_agent” is streamed and persisted, then yielded as JSON. Check that each chunk is valid JSON, especially when partial tokens may be split. Consider logging or error-handling ephemeral parsing issues to avoid partial JSON outputs.
✅ Verification successful
Let me gather more information about the kickoff_rag_agent
implementation to understand how it handles chunks.
Let me try a different approach to find the RAG agent implementation.
Let me check the complete implementation of the RAG agent to understand how it handles chunks.
Based on the implementation review, I can now provide the final response:
The chunked JSON handling in the RAG agent is safe and properly structured.
The kickoff_rag_agent
function yields chunks that are complete lines of text, and the JSON structure in qna_chat_agent.py
is constructed with fixed fields ("citations" and "message") where only the content
variable is dynamic. The streaming implementation ensures that:
- Each chunk is a complete line from the RAG agent's output
- The JSON structure is consistently formed for each chunk
- The content is properly stringified before JSON encoding
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Purpose: Check that the chunked JSON returned by `kickoff_rag_agent` remains valid.
# Approach: Grep for lines that produce "json.dumps" to ensure the chunk is properly formed
# And ensure partial chunk lines do not break JSON.
rg -A 3 'json.dumps'
Length of output: 8330
Script:
#!/bin/bash
# Check the implementation of kickoff_rag_agent and its chunk handling
ast-grep --pattern 'async def kickoff_rag_agent($$$)'
Length of output: 55
Script:
#!/bin/bash
# Search for kickoff_rag_agent function using ripgrep for better context
rg -A 10 "def kickoff_rag_agent"
Length of output: 870
Script:
#!/bin/bash
# Get the full implementation of the RAG agent
cat app/modules/intelligence/agents/agents/rag_agent.py
Length of output: 13240
app/modules/intelligence/agents/agents/callback_handler.py (1)
7-67
: Well-structured FileCallbackHandler.
The callback setup, log file initialization, and step-based logging structure are clear and beneficial. Great job on enabling future extensibility using commented-out blocks for JSON parsing. No critical changes recommended here.
from typing import Any, AsyncGenerator, Dict, List, Optional | ||
from typing import Annotated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove unused imports to improve clarity.
The static analysis hints correctly highlight that 'typing.Any', 'typing.Optional', and 'typing.Annotated' are not used in this file. Consider removing them.
Here is a suggested diff:
-from typing import Any, AsyncGenerator, Dict, List, Optional
-from typing import Annotated
+from typing import AsyncGenerator, Dict, List
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
from typing import Any, AsyncGenerator, Dict, List, Optional | |
from typing import Annotated | |
from typing import AsyncGenerator, Dict, List |
🧰 Tools
🪛 Ruff (0.8.2)
5-5: typing.Any
imported but unused
Remove unused import
(F401)
5-5: typing.Optional
imported but unused
Remove unused import
(F401)
6-6: typing.Annotated
imported but unused
Remove unused import: typing.Annotated
(F401)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (35)
app/modules/intelligence/agents/agent_classifier.py (4)
2-8
: Remove unused imports for better clarity and maintenance.
The static analysis shows that "BaseMessage" and "ClassificationResult" are never used. Removing them will keep the codebase clean and free of dead imports.-from langchain.schema import BaseMessage from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import PydanticOutputParser -from app.modules.intelligence.prompts.classification_prompts import ClassificationResult from pydantic import BaseModel, Field🧰 Tools
🪛 Ruff (0.8.2)
2-2:
langchain.schema.BaseMessage
imported but unusedRemove unused import:
langchain.schema.BaseMessage
(F401)
8-8:
app.modules.intelligence.prompts.classification_prompts.ClassificationResult
imported but unusedRemove unused import:
app.modules.intelligence.prompts.classification_prompts.ClassificationResult
(F401)
21-41
: Consider using f-strings or shorter docstrings for improved readability
The prompt template is structured nicely. However, consider condensing or reformatting the multi-line string to ensure clarity and maintainability. For instance, you could leverage Python f-strings or single lines for each instruction if appropriate.
53-61
: Validate that the last message logic meets user experience expectations
Here, you only extract the last 10 messages and the last message’s content for classification. Verify whether there's any scenario in which older messages are also crucial for classification (e.g., for more extended conversation contexts).Would you like assistance in adding an optional parameter to control how many historical messages get used for classification?
64-80
: Check error handling or fallback strategies
Currently, your classification chain simply awaits the language model. For better resilience, consider adding a retry mechanism or fallback classification if the chain fails.app/modules/intelligence/agents/chat_agents/code_gen_chat_agent.py (4)
33-39
: Best practice: Provide docstrings for clarity.
Though the TypedDict fields are self-explanatory, adding a brief docstring for the State definition can help future contributors quickly understand its purpose.
50-56
: Validate your state graph transitions.
You add a node "code_gen_agent" and connect it from START to END. If more tasks need to be chained, consider hooking them into the graph as well.
64-75
: Add docstring or method comment explaining the run() method's responsibilities.
This method sets up the state for your graph, which is a key part of understanding your agent’s lifecycle. A quick explanation can help.
Line range hint
76-118
: Check partial completions or error handling for asynchronous code generation.
The code properly yields chunks. However, if partial errors occur during code generation, consider capturing partial logs or gracefully ending iteration to avoid abrupt breaks.app/modules/intelligence/agents/agents/callback_handler.py (2)
21-23
: Add type hints for function arguments if feasible.
You have an annotated argument, but for thorough clarity, consider specifying the types in the docstring or function signature for each possible branch of “step_output.”
37-71
: Consider adding standard result logging.
You commented out JSON parsing for tool input and results. If this debugging feature is valuable, consider reintroducing or providing alternate logging routes.app/modules/intelligence/agents/chat_agents/debugging_chat_agent.py (5)
88-96
: Document each field in the State TypedDict.
As your debugging agent grows, clarity on every field’s meaning (e.g., logs, stacktrace) will prove beneficial.
97-108
: Ensure streaming partial errors are captured gracefully.
While you yield chunks, handle scenarios where an exception might arise mid-stream, so that partial responses or debug details are preserved.
109-115
: Verify transitions in your state graph.
Like in other agents, confirm that any extended debugging tasks or fallback steps are accounted for if “rag_agent” is insufficient.
125-138
: Add docstring on the run() method specifying how and when debugging logs or stack traces are used.
This ensures new developers quickly grasp how these parameters affect the debugging flow.
Line range hint
139-196
: Refine classification error handling logic.
If the classification chain or RAG process fails, it logs the exception, but the user’s error message is a single string. Consider returning JSON with an “error” field for consistent output.app/modules/intelligence/agents/agents/low_level_design_agent.py (2)
180-181
: Refactor nested with statements for readability.
Ruff (SIM117) suggests combining nested with statements. You can merge them into a single statement:- with os.fdopen(write_fd, "w", buffering=1) as write_file: - with redirect_stdout(write_file): + with os.fdopen(write_fd, "w", buffering=1) as write_file, redirect_stdout(write_file): crew = Crew( ...🧰 Tools
🪛 Ruff (0.8.2)
180-181: Use a single
with
statement with multiple contexts instead of nestedwith
statements(SIM117)
213-218
: Use consistent naming and error handling.
The helper function and the async generator align well with the new streaming approach. However, ensure that any exceptions raised insidedesign_agent.run()
are either caught or properly propagated to the caller.app/modules/intelligence/agents/chat_agents/lld_chat_agent.py (4)
16-17
: Validate new dependencies and classes.
You’ve introduced StateGraph and StreamWriter from langgraph, and TypedDict from typing_extensions. Ensure that dependencies are installed and pinned to a stable version to prevent unexpected import errors.Also applies to: 19-19
118-129
: Consider capturing chain creation errors.
If_create_chain
fails or returns None, the subsequent astream might yield incomplete responses. Consider early detection or fallback to an error message.
130-136
: Ensure correct ordering of argument validations in 'execute'.
Before classifying or building chains, consider checking if query/project_id are well-formed. Fail early to reduce wasted overhead.
Line range hint
166-192
: Monitor message buffer size while streaming.
Large amounts of streamed content could accumulate in the buffer, potentially delaying flushes. Consider a periodic flush or memory threshold-based approach if you expect large responses.app/modules/intelligence/agents/agents/debug_rag_agent.py (3)
242-250
: Parameter naming clarity.
The function’s expanded parameters (e.g.,chat_history
,file_structure
variables) are descriptive. Document each param’s expected format (especially if you anticipate further expansions).
256-257
: Refactor nested with statements.
Following Ruff’s (SIM117) suggestion, you can merge them for clearer readability.- with os.fdopen(write_fd, "w", buffering=1) as write_file: - with redirect_stdout(write_file): + with os.fdopen(write_fd, "w", buffering=1) as write_file, redirect_stdout(write_file): crew = Crew( ...🧰 Tools
🪛 Ruff (0.8.2)
256-257: Use a single
with
statement with multiple contexts instead of nestedwith
statements(SIM117)
268-282
: Consider progressive partial streaming.
Right now, you wait until "## Final Answer:" to trigger final answer streaming. If you need partial streaming of debug logs earlier, consider relaxing the condition or introducing earlier yield points.app/modules/intelligence/agents/agents/code_gen_agent.py (1)
266-266
: Maintain consistent docstrings for new return type.
Reflect in docstrings that ‘run’ now streams chunks of text, not a single string. This helps other developers or tools that rely on docstrings for usage info.app/modules/conversations/conversation/conversation_service.py (3)
2-2
: Remove unused imports as per static analysis hints.
Both 'json' and 'fastapi.HTTPException' appear to be unused, causing clutter.- import json - from fastapi import HTTPExceptionAlso applies to: 8-8
🧰 Tools
🪛 Ruff (0.8.2)
2-2:
json
imported but unusedRemove unused import:
json
(F401)
81-122
: Constructor clarity and error handling.
The constructor sets up agent references but uses the uninitialized 'self.classifier' field. If there's an intention to use a dedicated classifier object, consider removing or initializing it for explicit clarity.
198-212
: Robustness in query processing.
Your approach to dynamically build state and graph is clean, but ensure upstream code handles edge cases like empty 'query' or missing 'agent_id' arguments.app/modules/intelligence/agents/agent_factory.py (3)
14-19
: AgentFactory constructor usage.
Maintaining a simple agent cache is a smart approach. Check for memory usage if number of user IDs or agent IDs grows significantly.
20-32
: Cache key structure.
Using “agent_id_user_id” as a cache key is straightforward. However, if you incorporate roles or scopes in the future, keep in mind you may need a more granular cache key.
34-55
: Factory method logic.
The mapping structure is clean and extensible. If you plan to handle many agent variants, consider auto-discovery or dynamic import to avoid large mappings.app/modules/intelligence/agents/chat_agents/unit_test_chat_agent.py (1)
101-101
: Return with context.
When node_ids are empty, you provide immediate feedback and end the generator. This is clear and user-friendly, though consider a more direct error or a user prompt to indicate next steps.app/modules/intelligence/agents/chat_agents/qna_chat_agent.py (1)
123-136
: Orchestration of QNA flow.
Storing and streaming states is implemented cleanly. Verify any edge cases for empty or None fields in 'state'.app/modules/intelligence/agents/agents/rag_agent.py (2)
77-77
: Consider implementing or removing the commented FileCallbackHandler code.The commented lines suggest a planned integration of step-by-step logging. Either implement this feature or remove the commented code to maintain code cleanliness.
Would you like me to help implement the FileCallbackHandler integration or create an issue to track this task?
Also applies to: 108-108
284-292
: Consider using higher-level async primitives.The current implementation uses low-level pipe operations. Consider using higher-level async primitives like
asyncio.Queue
orasyncio.StreamWriter/StreamReader
for better maintainability.Also, combine the nested
with
statements for better readability:- with os.fdopen(write_fd, "w", buffering=1) as write_file: - with redirect_stdout(write_file): + with os.fdopen(write_fd, "w", buffering=1) as write_file, redirect_stdout(write_file):🧰 Tools
🪛 Ruff (0.8.2)
287-288: Use a single
with
statement with multiple contexts instead of nestedwith
statements(SIM117)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
app/modules/conversations/conversation/conversation_service.py
(5 hunks)app/modules/intelligence/agents/agent_classifier.py
(1 hunks)app/modules/intelligence/agents/agent_factory.py
(1 hunks)app/modules/intelligence/agents/agents/callback_handler.py
(1 hunks)app/modules/intelligence/agents/agents/code_gen_agent.py
(4 hunks)app/modules/intelligence/agents/agents/debug_rag_agent.py
(4 hunks)app/modules/intelligence/agents/agents/low_level_design_agent.py
(3 hunks)app/modules/intelligence/agents/agents/rag_agent.py
(4 hunks)app/modules/intelligence/agents/chat_agents/code_gen_chat_agent.py
(4 hunks)app/modules/intelligence/agents/chat_agents/debugging_chat_agent.py
(6 hunks)app/modules/intelligence/agents/chat_agents/lld_chat_agent.py
(5 hunks)app/modules/intelligence/agents/chat_agents/qna_chat_agent.py
(5 hunks)app/modules/intelligence/agents/chat_agents/unit_test_chat_agent.py
(1 hunks)app/modules/parsing/graph_construction/code_graph_service.py
(0 hunks)app/modules/parsing/graph_construction/parsing_repomap.py
(1 hunks)
💤 Files with no reviewable changes (1)
- app/modules/parsing/graph_construction/code_graph_service.py
🧰 Additional context used
🪛 Ruff (0.8.2)
app/modules/intelligence/agents/agents/rag_agent.py
287-288: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
app/modules/intelligence/agents/agents/code_gen_agent.py
286-287: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
app/modules/intelligence/agents/agent_classifier.py
2-2: langchain.schema.BaseMessage
imported but unused
Remove unused import: langchain.schema.BaseMessage
(F401)
8-8: app.modules.intelligence.prompts.classification_prompts.ClassificationResult
imported but unused
Remove unused import: app.modules.intelligence.prompts.classification_prompts.ClassificationResult
(F401)
app/modules/intelligence/agents/agents/low_level_design_agent.py
180-181: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
app/modules/intelligence/agents/agents/debug_rag_agent.py
256-257: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
app/modules/conversations/conversation/conversation_service.py
2-2: json
imported but unused
Remove unused import: json
(F401)
8-8: fastapi.HTTPException
imported but unused
Remove unused import: fastapi.HTTPException
(F401)
77-77: Redefinition of unused END
from line 9
Remove definition: END
(F811)
77-77: Redefinition of unused StateGraph
from line 9
Remove definition: StateGraph
(F811)
78-78: Redefinition of unused Command
from line 10
Remove definition: Command
(F811)
79-79: Redefinition of unused AsyncGenerator
from line 5
Remove definition: AsyncGenerator
(F811)
79-79: Redefinition of unused Dict
from line 5
Remove definition: Dict
(F811)
79-79: Redefinition of unused Any
from line 5
Remove definition: Any
(F811)
🔇 Additional comments (31)
app/modules/intelligence/agents/agent_classifier.py (1)
43-52
: Ensure default assignment for empty messages is intentional
When there are no messages, you default to the first agent with confidence=0.0. Double-check that this fallback aligns with your intended agent-selection logic.
app/modules/intelligence/agents/chat_agents/code_gen_chat_agent.py (1)
40-49
: Leverage writer patterns carefully.
The _stream_code_gen_agent method streams chunks into writer. Ensure writer failures (e.g., I/O issues) are handled gracefully.
app/modules/intelligence/agents/agents/callback_handler.py (1)
78-115
: Verify uniform formatting for observations.
You have a nice approach to parse lines starting with "Title:", "Link:", or "Snippet:". Ensure that no additional search-like keys are missed (e.g., “Summary:”).
app/modules/intelligence/agents/agents/low_level_design_agent.py (2)
171-171
: Good to see the new streaming return type.
Returning an AsyncGenerator improves responsiveness and allows the client to consume partial results as they become available.
177-204
: Ensure resource cleanup and robust error handling for pipe-based streaming.
While the use of a pipe and redirecting stdout is valid for streaming logs, consider supplementing with explicit exception handling within the async task to avoid resource leaks if the task fails unexpectedly.
🧰 Tools
🪛 Ruff (0.8.2)
180-181: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
app/modules/intelligence/agents/chat_agents/lld_chat_agent.py (4)
7-7
: Avoid removing necessary imports unless confirmed unused.
You removed the SystemMessage import from langchain, but you’re still referencing different message types from langchain. Double-check that the removal wasn’t needed for future expansions or debugging.
87-93
: TypedDict 'State' fosters clarity, but watch for runtime constraints.
TypedDict is great for typed state management, but it doesn’t enforce runtime checks. If you deal with untrusted data, consider additional validations.
[approve]
94-103
: Check concurrency in _stream_rag_agent.
Ensuring that multiple concurrent calls to this method won’t conflict is crucial, especially if writer references shared objects. If concurrency is possible, confirm that shared resources are thread-safe or create distinct writer instances per call.
104-110
: Graph-based approach is well-structured.
This approach fosters modular flow control. Keep in mind the overhead of building, compiling, and running the state graph. If performance is critical, measure carefully.
app/modules/intelligence/agents/agents/debug_rag_agent.py (3)
1-4
: Imports updated for asynchronous streaming.
These changes align with the shift to an AsyncGenerator. Ensure agentops
usage is integrated consistently across your codebase and confirm no leftover references to old synchronous patterns.
233-233
: Asynchronous approach with generator is beneficial.
The run
method’s new AsyncGenerator return type is consistent with the rest of the system. This lets you stream partial debug info promptly.
253-264
: Avoid overshadowing file descriptors.
When opening pipe FDs with os.fdopen
, ensure you close them after use. The nested function closure could retain references, potentially causing resource leaks in complex usage patterns.
🧰 Tools
🪛 Ruff (0.8.2)
256-257: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
app/modules/intelligence/agents/agents/code_gen_agent.py (3)
1-4
: Transition to AsyncGenerator for code generation is a good step.
Streaming code changes helps with incremental feedback, especially if code is large or complex.
283-284
: Guard against pipe usage on unsupported platforms.
If there’s a chance this is used on Windows, confirm os.pipe()
meets the environment’s constraints or provide fallback.
298-310
: Incremental streaming logic is consistent across agents.
Ensuring all agents use the same streaming approach simplifies debugging. Keep an eye on large output volumes; consider chunking to avoid large memory spikes.
app/modules/conversations/conversation/conversation_service.py (6)
123-131
: State structure usage.
Defining a TypedDict for your state is a good approach. Ensure that related code references do not rely on missing fields.
132-163
: Classification fallback logic.
When confidence is < 0.5 or the agent doesn’t exist, the fallback returns the state’s 'agent_id' but does not confirm if it’s a valid fallback. Consider verifying the fallback agent ID is valid or else it may raise downstream errors.
165-180
: Consistency in streaming approach.
The 'agent_node' method yields chunks from multiple agents. Ensure concurrency is well-handled (e.g., no collisions in writing).
181-197
: Graph building is comprehensive.
The approach to compile a StateGraph with classifier and variable agent nodes is a good design for modular expansions.
598-599
: Initiating supervisor.
Creating a new Supervisor for each call might have performance costs if you're repeatedly calling this method. Consider a more persistent approach if usage is frequent.
[performance]
615-616
: Streaming control for chunk-based output.
Your streaming logic is straightforward. Verify that partial outputs do not contain partially formed JSON or user-submitted partial data that might break the client.
app/modules/intelligence/agents/agent_factory.py (1)
1-2
: Well-managed imports.
Imports are necessary and relevant (Dict, Any, Session). No concerns.
app/modules/intelligence/agents/chat_agents/unit_test_chat_agent.py (1)
99-101
: Citations initialization.
Initializing citations as an empty list is correct. This ensures no residual state from previous runs.
app/modules/intelligence/agents/chat_agents/qna_chat_agent.py (6)
7-7
: Import usage verification.
All imports here ('AsyncGenerator', 'Dict', 'List') appear relevant.
16-19
: TypedDict usage.
Combining 'TypedDict' with 'StateGraph' patterns is a neat approach for typed workflows.
87-93
: State definition for QNAChatAgent
The new State
typed dict is consistent with typed states in other agents. Good addition for clarity.
94-103
: Asynchronous streaming method.
The _stream_rag_agent
method clarifies the streaming approach, but be cautious with shared resources or concurrency issues in large-scale usage.
104-115
: Graph creation logic.
The _create_graph
method sets a straightforward pipeline for the agent’s streaming workflow. This is a nice, modular approach.
Line range hint 137-201
: Execution with partial results.
The conditional flow depending on classification is well structured. Verify that partial JSON responses are handled properly in the client, especially if streaming is interrupted mid-stream.
app/modules/intelligence/agents/agents/rag_agent.py (1)
275-275
: LGTM: Return type correctly reflects streaming capability.
The change to AsyncGenerator[str, None]
properly indicates that the function now streams its results asynchronously.
app/modules/parsing/graph_construction/parsing_repomap.py (1)
28-28
: LGTM: Clear documentation of code origin and customization.
The comment effectively documents the code's origin and its customization for potpie's specific needs.
async def kickoff(): | ||
with os.fdopen(write_fd, "w", buffering=1) as write_file: | ||
with redirect_stdout(write_file): | ||
crew = Crew( | ||
agents=[code_generator], | ||
tasks=[generation_task], | ||
process=Process.sequential, | ||
verbose=True, | ||
) | ||
await crew.kickoff_async() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Resource release in 'kickoff'
Similar to other files, combining nested with statements can reduce complexity. Also, handle exceptions carefully, ensuring the pipe is closed even on failure.
- with os.fdopen(write_fd, "w", buffering=1) as write_file:
- with redirect_stdout(write_file):
+ with os.fdopen(write_fd, "w", buffering=1) as write_file, redirect_stdout(write_file):
crew = Crew(
...
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def kickoff(): | |
with os.fdopen(write_fd, "w", buffering=1) as write_file: | |
with redirect_stdout(write_file): | |
crew = Crew( | |
agents=[code_generator], | |
tasks=[generation_task], | |
process=Process.sequential, | |
verbose=True, | |
) | |
await crew.kickoff_async() | |
async def kickoff(): | |
with os.fdopen(write_fd, "w", buffering=1) as write_file, redirect_stdout(write_file): | |
crew = Crew( | |
agents=[code_generator], | |
tasks=[generation_task], | |
process=Process.sequential, | |
verbose=True, | |
) | |
await crew.kickoff_async() |
🧰 Tools
🪛 Ruff (0.8.2)
286-287: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
from langgraph.graph import END, StateGraph | ||
from langgraph.types import Command | ||
from typing import AsyncGenerator, Dict, Any | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove redundant redefinitions.
There appears to be a redefinition of END, StateGraph, and Command, which are already imported at lines 9-10. Remove lines 77-79 to prevent naming conflicts and warnings.
- from langgraph.graph import END, StateGraph
- from langgraph.types import Command
- from typing import AsyncGenerator, Dict, Any
🧰 Tools
🪛 Ruff (0.8.2)
77-77: Redefinition of unused END
from line 9
Remove definition: END
(F811)
77-77: Redefinition of unused StateGraph
from line 9
Remove definition: StateGraph
(F811)
78-78: Redefinition of unused Command
from line 10
Remove definition: Command
(F811)
79-79: Redefinition of unused AsyncGenerator
from line 5
Remove definition: AsyncGenerator
(F811)
79-79: Redefinition of unused Dict
from line 5
Remove definition: Dict
(F811)
79-79: Redefinition of unused Any
from line 5
Remove definition: Any
(F811)
# Yield CrewAgent logs as they are written to the pipe | ||
final_answer_streaming = False | ||
async with aiofiles.open(read_fd, mode="r") as read_file: | ||
async for line in read_file: | ||
if not line: | ||
break | ||
else: | ||
if final_answer_streaming: | ||
if line.endswith("\\x1b[00m\\n"): | ||
yield line[:-6] | ||
else: | ||
yield line | ||
if "## Final Answer:" in line: | ||
final_answer_streaming = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve robustness of streaming implementation.
Consider the following improvements:
- Extract "## Final Answer:" as a constant to avoid magic strings
- Use a proper ANSI escape sequence parser instead of string manipulation
- Add error handling for the case when the pipe is closed unexpectedly
+FINAL_ANSWER_MARKER = "## Final Answer:"
+
async def kickoff_rag_agent(...):
# ... existing code ...
final_answer_streaming = False
async with aiofiles.open(read_fd, mode="r") as read_file:
try:
async for line in read_file:
if not line:
break
else:
if final_answer_streaming:
- if line.endswith("\\x1b[00m\\n"):
- yield line[:-6]
- else:
- yield line
- if "## Final Answer:" in line:
+ # Use proper ANSI escape sequence handling
+ clean_line = strip_ansi_sequences(line)
+ yield clean_line
+ if FINAL_ANSWER_MARKER in line:
final_answer_streaming = True
+ except Exception as e:
+ logging.error(f"Error during streaming: {e}")
+ raise
Committable suggestion skipped: line range outside the PR's diff.
Quality Gate failedFailed conditions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
app/modules/intelligence/agents/chat_agents/qna_chat_agent.py (1)
Line range hint
172-200
: Enhance error handling in streaming implementation.The streaming implementation could be improved:
- No timeout handling for long-running operations
- No backpressure handling for slow consumers
- No cleanup of resources in case of errors
Consider adding:
async for chunk in kickoff_rag_agent( query, project_id, [ msg.content for msg in validated_history if isinstance(msg, HumanMessage) ], node_ids, self.db, self.llm, self.mini_llm, user_id, ): + try: + content = str(chunk) + await asyncio.wait_for( + self.history_manager.add_message_chunk( + conversation_id, + content, + MessageType.AI_GENERATED, + citations=citations, + ), + timeout=5.0 + ) + yield json.dumps( + { + "citations": citations, + "message": content, + } + ) + except asyncio.TimeoutError: + logger.error("Timeout while processing chunk") + break + except Exception as e: + logger.error(f"Error processing chunk: {str(e)}") + break
🧹 Nitpick comments (3)
app/modules/intelligence/agents/agents_service.py (1)
55-55
: Consider clarifying the "blast radius" terminologyThe description changes from "detailed analysis" to "blast radius" of code changes. While this better reflects the agent's focus on impact analysis, the term might not be immediately clear to all users.
Consider expanding the description to be more explicit:
- description="An agent specialized in generating blast radius of the code changes in your current branch compared to default branch. Use this for functional review of your code changes. Works best with Py, JS, TS", + description="An agent specialized in analyzing the impact and dependencies affected by code changes in your current branch compared to default branch (blast radius). Use this for functional review of your code changes. Works best with Py, JS, TS",app/modules/intelligence/agents/agent_factory.py (1)
35-50
: Consider adding cache eviction strategy.While the caching implementation is good, there's no mechanism to prevent unbounded growth of the cache. Consider:
- Adding a maximum cache size
- Implementing an LRU eviction policy
- Adding cache entry expiration
app/modules/conversations/conversation/conversation_service.py (1)
Line range hint
622-640
: Add retry mechanism for agent initialization.The agent initialization and processing could fail due to temporary issues. Consider adding a retry mechanism:
+from tenacity import retry, stop_after_attempt, wait_exponential +@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def _initialize_supervisor(self, supervisor, user_id): await supervisor.initialize(user_id) async def _generate_and_stream_ai_response(...): supervisor = SimplifiedAgentSupervisor(self.sql_db, self.provider_service) - await supervisor.initialize(user_id) + await self._initialize_supervisor(supervisor, user_id)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
app/modules/conversations/conversation/conversation_service.py
(5 hunks)app/modules/intelligence/agents/agent_factory.py
(1 hunks)app/modules/intelligence/agents/agents_service.py
(1 hunks)app/modules/intelligence/agents/chat_agents/qna_chat_agent.py
(5 hunks)
🔇 Additional comments (3)
app/modules/intelligence/agents/agents_service.py (2)
55-55
: Ensure streaming support in code_changes_agent implementation
Given that this PR focuses on streaming capabilities, and blast radius analysis could be computationally intensive, we should verify that the code_changes_agent implementation supports streaming its results.
✅ Verification successful
Code changes agent has proper streaming implementation
The code_changes_chat_agent.py implements streaming through AsyncGenerator and yields JSON responses with citations and messages. The implementation follows the same streaming pattern used across other agents in the codebase:
- Uses AsyncGenerator[str, None] return type for run() method
- Implements streaming via yield statements
- Returns structured JSON responses with citations and messages
- Handles errors through streaming as well
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if code_changes_agent implementation supports streaming
# Look for async/streaming patterns in the agent implementation
echo "Searching for async/streaming patterns in code_changes_agent..."
rg -A 5 -B 5 'async def|yield|aiter|AsyncGenerator' --type py -g 'app/modules/intelligence/agents/**/*agent*.py'
# Look for the callback handler integration
echo "Searching for callback handler usage..."
rg -A 3 'FileCallbackHandler' --type py -g 'app/modules/intelligence/agents/**'
Length of output: 69304
55-55
: Verify language support capabilities for blast radius analysis
The description now explicitly states "Works best with Py, JS, TS". Let's verify if the underlying implementation supports these languages for blast radius analysis.
✅ Verification successful
Language support for blast radius analysis is accurately described
The description's claim about language support (Py, JS, TS) is accurate based on the implementation evidence:
-
The blast radius analysis is implemented through a dedicated
BlastRadiusAgent
class that uses:- A change detection tool that fetches code changes
- A knowledge graph querying system for codebase analysis
- A structured analysis approach that examines:
- Direct changes
- Indirect effects
- Critical areas
- Multiple API/Consumer impacts
-
The agent is language-agnostic in its core implementation, relying on:
- AST-based change detection
- Knowledge graph queries for dependency analysis
- Pydantic models for structured output
The "Works best with Py, JS, TS" statement is appropriate as it indicates preference rather than limitation, which aligns with the implementation's capabilities to analyze code changes across these common languages.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if the code_changes_agent implementation has specific handlers for Python, JavaScript, and TypeScript
# Look for language-specific analysis patterns in the codebase
echo "Searching for language-specific analysis patterns..."
rg -l --type-add 'agent:*.{py,js,ts}' -g 'app/modules/intelligence/agents/**' -e 'python|javascript|typescript|\.py|\.js|\.ts' --stats
# Look for the actual implementation of code_changes_agent
echo "Searching for code_changes_agent implementation..."
ast-grep --pattern 'class CodeChangesAgent' -l
Length of output: 1366
Script:
#!/bin/bash
# Let's try a different approach to find the code changes agent implementation
# Search for code changes agent class with proper language specification
echo "Searching for code changes agent implementation..."
ast-grep --pattern 'class CodeChangesAgent' --lang python
# Search for language-specific analysis patterns in code changes related files
echo "Searching for language support in code changes implementation..."
rg -A 5 -B 5 "python|javascript|typescript|\.py|\.js|\.ts" app/modules/intelligence/agents/chat_agents/code_changes_chat_agent.py
# Look for file extension handling or language detection logic
echo "Searching for file extension handling..."
rg -A 5 "\.py|\.js|\.ts" app/modules/intelligence/agents/chat_agents/code_changes_chat_agent.py
# Search for blast radius analysis implementation
echo "Searching for blast radius analysis logic..."
rg -A 10 "blast.?radius" app/modules/intelligence/agents/**
Length of output: 16043
app/modules/intelligence/agents/agent_factory.py (1)
29-34
: LGTM! Clean initialization with proper dependency injection.
The class follows good practices:
- Clear dependency injection of required services
- Type hints for better code clarity
- Efficient caching strategy using dictionary
def _create_agent( | ||
self, agent_id: str, mini_llm, reasoning_llm, user_id: str | ||
) -> Any: | ||
"""Create a new agent instance""" | ||
agent_map = { | ||
"debugging_agent": lambda: DebuggingChatAgent( | ||
mini_llm, reasoning_llm, self.db | ||
), | ||
"codebase_qna_agent": lambda: QNAChatAgent( | ||
mini_llm, reasoning_llm, self.db | ||
), | ||
"unit_test_agent": lambda: UnitTestAgent(mini_llm, reasoning_llm, self.db), | ||
"integration_test_agent": lambda: IntegrationTestChatAgent( | ||
mini_llm, reasoning_llm, self.db | ||
), | ||
"code_changes_agent": lambda: CodeChangesChatAgent( | ||
mini_llm, reasoning_llm, self.db | ||
), | ||
"LLD_agent": lambda: LLDChatAgent(mini_llm, reasoning_llm, self.db), | ||
"code_generation_agent": lambda: CodeGenerationChatAgent( | ||
mini_llm, reasoning_llm, self.db | ||
), | ||
} | ||
|
||
if agent_id in agent_map: | ||
return agent_map[agent_id]() | ||
|
||
# If not a system agent, create custom agent | ||
return CustomAgent( | ||
llm=reasoning_llm, db=self.db, agent_id=agent_id, user_id=user_id | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider enhancing error handling and agent validation.
The agent creation logic could be improved:
- No validation of agent_id format/structure
- Silent fallback to CustomAgent might mask configuration errors
- No error handling for agent instantiation failures
Here's a suggested improvement:
def _create_agent(self, agent_id: str, mini_llm, reasoning_llm, user_id: str) -> Any:
"""Create a new agent instance"""
+ if not isinstance(agent_id, str) or not agent_id.strip():
+ raise ValueError("Invalid agent_id provided")
+
agent_map = {
"debugging_agent": lambda: DebuggingChatAgent(
mini_llm, reasoning_llm, self.db
),
# ... other agents ...
}
try:
+ if agent_id in agent_map:
+ return agent_map[agent_id]()
+
+ logger.info(f"No system agent found for {agent_id}, creating custom agent")
+ return CustomAgent(
+ llm=reasoning_llm, db=self.db, agent_id=agent_id, user_id=user_id
+ )
+ except Exception as e:
+ logger.error(f"Failed to create agent {agent_id}: {str(e)}")
+ raise
Committable suggestion skipped: line range outside the PR's diff.
- Need different expertise: `ml_training_agent|0.85` | ||
- Overlapping domains, choose more specialized: `choose_higher_expertise_agent|0.80` | ||
""" | ||
|
||
# Format agent descriptions for the prompt | ||
self.agent_descriptions = "\n".join( | ||
[f"- {agent.id}: {agent.description}" for agent in available_agents] | ||
) | ||
|
||
class State(TypedDict): | ||
query: str | ||
project_id: str | ||
conversation_id: str | ||
response: Optional[str] | ||
agent_id: Optional[str] | ||
user_id: str | ||
node_ids: List[NodeContext] | ||
|
||
async def classifier_node(self, state: State) -> Command: | ||
"""Classifies the query and routes to appropriate agent""" | ||
if not state.get("query"): | ||
return Command(update={"response": "No query provided"}, goto=END) | ||
|
||
# Classification using LLM with enhanced prompt | ||
prompt = self.classifier_prompt.format( | ||
query=state["query"], | ||
agent_id=state["agent_id"], | ||
agent_descriptions=self.agent_descriptions, | ||
) | ||
response = await self.llm.ainvoke(prompt) | ||
|
||
# Parse response | ||
try: | ||
agent_id, confidence = response.content.split("|") | ||
confidence = float(confidence) | ||
except (ValueError, TypeError): | ||
return Command( | ||
update={"response": "Error in classification format"}, goto=END | ||
) | ||
|
||
if confidence < 0.5 or agent_id not in self.agents: | ||
return Command( | ||
update={"agent_id": state["agent_id"]}, goto=state["agent_id"] | ||
) | ||
|
||
return Command(update={"agent_id": agent_id}, goto=agent_id) | ||
|
||
async def agent_node(self, state: State, writer: StreamWriter): | ||
"""Creates a node function for a specific agent""" | ||
agent = self.agents[state["agent_id"]] | ||
async for chunk in agent.run( | ||
query=state["query"], | ||
project_id=state["project_id"], | ||
conversation_id=state["conversation_id"], | ||
user_id=state["user_id"], | ||
node_ids=state["node_ids"], | ||
): | ||
if isinstance(chunk, str): | ||
writer(chunk) | ||
|
||
def build_graph(self) -> StateGraph: | ||
"""Builds the graph with classifier and agent nodes""" | ||
builder = StateGraph(self.State) | ||
|
||
# Add classifier as entry point | ||
builder.add_node("classifier", self.classifier_node) | ||
# builder.add_edge("classifier", END) | ||
|
||
# # Add agent nodes | ||
# node_func = await self.agent_node(self.State, StreamWriter) | ||
for agent_id in self.agents: | ||
builder.add_node(agent_id, self.agent_node) | ||
builder.add_edge(agent_id, END) | ||
|
||
builder.set_entry_point("classifier") | ||
return builder.compile() | ||
|
||
async def process_query( | ||
self, | ||
query: str, | ||
project_id: str, | ||
conversation_id: str, | ||
user_id: str, | ||
node_ids: List[NodeContext], | ||
agent_id: str, | ||
) -> AsyncGenerator[Dict[str, Any], None]: | ||
"""Main method to process queries""" | ||
state = { | ||
"query": query, | ||
"project_id": project_id, | ||
"conversation_id": conversation_id, | ||
"response": None, | ||
"user_id": user_id, | ||
"node_ids": node_ids, | ||
"agent_id": agent_id, | ||
} | ||
|
||
graph = self.build_graph() | ||
async for chunk in graph.astream(state, stream_mode="custom"): | ||
yield chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider splitting SimplifiedAgentSupervisor into smaller components.
The SimplifiedAgentSupervisor class has grown too large and handles multiple responsibilities:
- Agent initialization
- Classification
- State management
- Graph building
- Query processing
This violates the Single Responsibility Principle.
Consider splitting into:
- AgentInitializer
- QueryClassifier
- StateManager
- GraphBuilder
- QueryProcessor
Example structure:
class AgentInitializer:
def __init__(self, db, provider_service):
self.db = db
self.provider_service = provider_service
async def initialize(self, user_id: str) -> Dict[str, Any]:
# Agent initialization logic
class QueryClassifier:
def __init__(self, llm, agent_descriptions):
self.llm = llm
self.agent_descriptions = agent_descriptions
async def classify(self, query: str, current_agent_id: str) -> Tuple[str, float]:
# Classification logic
# ... other classes
Summary by CodeRabbit
New Features
FileCallbackHandler
for structured logging of agent activities.SimplifiedAgentSupervisor
for centralized agent management and query classification.CodeGenerationAgent
,DebugRAGAgent
, andLowLevelDesignAgent
to support asynchronous streaming of results.QNAChatAgent
,DebuggingChatAgent
, andLLDChatAgent
.CodeGraphService
.Bug Fixes
Documentation