Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repeated Messages from OrchestratorAgent #204

Open
AMGMNPLK opened this issue Dec 18, 2024 · 6 comments
Open

Repeated Messages from OrchestratorAgent #204

AMGMNPLK opened this issue Dec 18, 2024 · 6 comments
Labels

Comments

@AMGMNPLK
Copy link

Description:
I am experiencing an issue with an OrchestratorAgent that inherits from Agent where it sends repeated messages to the user. This seems to occur when the agent processes responses from the OpenAI API, leading to the same output being sent multiple times.

Steps to Reproduce:

  1. Initialize the OrchestratorAgent.
  2. Send a message to create a process.
  3. Observe the output in the logs.

Expected Behavior:
The agent should send a single, complete response to the user without repeating the same message multiple times.

Actual Behavior:
The agent sends the same message repeatedly, which appears to be a result of how streaming responses are handled.

@bonk1t
Copy link
Collaborator

bonk1t commented Dec 18, 2024

Hey @AMGMNPLK! Thank you for reporting the issue. Could you send us your code, Python version, framework version, the model you’re using, and any additional information that might help us reproduce the issue?

@AMGMNPLK
Copy link
Author

Versions

Python. 3.11.9
agency-swarm 0.4.2
openai 1.56.2
Models: gpt-4o-mini and gpt-4o

Steps to reproduce

I you ask something that can take some lines, the terminal

Main code

class OrchestratorMessageSchema(BaseModel):
    """Schema for orchestrator messages."""
    message: str = Field(..., description="Content of the message.")

class OrchestratorAgent(Agent):
    """
    Core agent responsible for orchestrating the sales conversation flow using NEPQ methodology.
    Makes structured decisions about which agents to involve and when, while maintaining
    conversation context and prospect profiles.
    """
    
    def __init__(self):
        logger.info("Initializing OrchestratorAgent")
        self._profile_coordinator_lock = Lock()
        super().__init__(
            name="OrchestratorAgent",
            description="Orchestrates the sales process and coordinates between agents",
            instructions="./instructions.md",
            tools_folder="./tools",
            max_prompt_tokens=16000,
            max_completion_tokens=4000,
            temperature=0.7
        )
        logger.debug("OrchestratorAgent initialized successfully")
        self._locks = {}  # Initialize locks dictionary
        
        # Initialize lock for solution suggestions
        self._solutions_lock = Lock()

    def _get_profile_lock(self, profile_id: str) -> Lock:
        """Get a lock for a specific profile."""
        if profile_id not in self._locks:
            self._locks[profile_id] = Lock()
        return self._locks[profile_id]

    def _execute_tool(self, tool_name: str, args: Dict[str, Any]) -> Optional[Any]:
        """Execute a specific tool with locking for ProfileCoordinator."""
        logger.info(f"Executing tool: {tool_name}", extra={'emoji': '🔧'})
        
        try:
            tool_class = self.tools.get(tool_name)
            if not tool_class:
                raise ValueError(f"Tool {tool_name} not found")
            
            # Use lock for ProfileCoordinator
            if tool_name == "ProfileCoordinator":
                with self._profile_coordinator_lock:
                    tool_instance = tool_class(**args)
                    return tool_instance.run()
            else:
                tool_instance = tool_class(**args)
                return tool_instance.run()
                
        except Exception as e:
            logger.error(f"Tool execution failed: {str(e)}")
            raise

    def _log_tool_usage(self, tool_name: str, args: dict):
        """Enhanced tool usage logging"""
        logger.info(f"Tool: {tool_name}", extra={'emoji': '🔧'})
        logger.debug(f"Arguments:\n{json.dumps(args, indent=2)}", extra={'emoji': '📊'})

    def _log_agent_communication(self, from_agent: str, to_agent: str, message: str):
        """Enhanced inter-agent communication logging"""
        logger.info(f"{from_agent} -> {to_agent}", extra={'emoji': '💬'})
        logger.debug(f"Message content:\n{message}", extra={'emoji': '📊'})

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def get_completion(self, message: str) -> str:
        """Get completion from the agent with retry logic."""
        try:
            response = super().get_completion(
                message,
                timeout=180  # Increase timeout to 3 minutes
            )
            return response
        except Exception as e:
            logger.error(f"Error in get_completion: {str(e)}")
            raise

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def send_message(self, recipient: str, message: str, **kwargs):
        """Send message to another agent with retry logic."""
        try:
            return super().send_message(recipient, message, **kwargs)
        except Exception as e:
            logger.error(f"Error sending message to {recipient}: {str(e)}")
            raise

    def process_message(self, message: str) -> Dict:
        """Process incoming messages."""
        try:
            if "create a profile" in message.lower():
                name = self._extract_profile_data(message)
                
                # Validate just the name
                validation_result = ProfileValidator(name=name).run()
                
                if validation_result.get("valid"):
                    # Create profile with just the name
                    profile_agent = ProfileAgent()
                    return profile_agent.run("create", name)
                
                return {"error": validation_result.get("message", "Invalid name")}
                
        except Exception as e:
            return {"error": str(e)}

    def _extract_profile_data(self, message: str) -> str:
        """Extract just the name from the message."""
        try:
            # Look for patterns like "create profile for John" or "profile about John"
            name_match = re.search(r'(?:profile for|about)\s+([^\n.,]+)', message, re.IGNORECASE)
            if name_match:
                return name_match.group(1).strip()
            return "Unknown"
        except Exception as e:
            logger.error(f"Error extracting name: {str(e)}")
            return "Unknown"

    def suggest_potential_solutions(self, profile_id: str) -> Dict:
        """Suggest potential solutions based on the prospect's profile."""
        with self._get_profile_lock(profile_id):
            # Send request to InsightExtractorAgent
            response = self.send_message(
                "InsightExtractorAgent",
                {
                    "action": "suggest_solutions",
                    "profile_id": profile_id
                }
            )
            
            return response

    def delete_discarded_solutions(self, profile_id: str, discarded_solution_ids: List[str]) -> Dict:
        """Delete discarded solutions from the prospect's profile."""
        with self._get_profile_lock(profile_id):
            profile = self.get_profile(profile_id)
            if not profile:
                return {"error": "Profile not found"}
            
            confirmed_solutions = profile.get("confirmed_solutions", [])
            updated_confirmed = [sol for sol in confirmed_solutions if sol not in discarded_solution_ids]
            
            profile["confirmed_solutions"] = updated_confirmed
            
            self.update_profile(profile_id, profile)
            
            return {
                "status": "success",
                "updated_confirmed_solutions": profile["confirmed_solutions"]
            }

    def validate_profile(self, profile_data):
        """Validate the provided profile data."""
        try:
            # Ensure profile_data is not empty and contains required fields
            if not profile_data or 'basic_info' not in profile_data:
                raise ValueError("Profile data is missing required fields.")

            # Call the ProfileValidator function with the actual profile data
            validation_result = ProfileValidator(profile_data=profile_data).run()
            
            return validation_result
        except Exception as e:
            logger.error(f"Error validating profile: {str(e)}")
            return {"error": str(e)}



@bonk1t
Copy link
Collaborator

bonk1t commented Dec 19, 2024

@AMGMNPLK Thank for providing the code. Your code appears to be non-standard, which might be causing the issue. Have you tried using our Cursor Rules with Cursor IDE (or just any advanced LLM) or following the examples in the Documentation?

It would be helpful to rule out custom error handling and retries, as they could be the root cause.

@AMGMNPLK
Copy link
Author

Yes, I'm working with .cursorrules. Knowing this I'll try to fix it asking it to keep attached to them.

@AMGMNPLK
Copy link
Author

It still happens, even creating a new OrchestratorAgent from zero. I tried to use other terminal/shell, with no success.

Copy link
Contributor

This issue is stale because it has been open for 30 days with no activity. Please upgrade to the latest version and test it again.

@github-actions github-actions bot added the stale label Jan 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants