From 6bafa255542afbdefcb883352528e2089ec48a1b Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Sun, 23 Feb 2025 22:25:06 -0800 Subject: [PATCH] interim trying to fix agentchat --- .../agentchat/groupchat/ChatAgentRouter.ts | 85 ++++++++++++++----- .../groupchat/RoundRobinGroupChat.ts | 50 ++++++++--- .../test/agentchat/AgentChatSmokeTest.ts | 2 +- 3 files changed, 105 insertions(+), 32 deletions(-) diff --git a/typescript/src/agentchat/groupchat/ChatAgentRouter.ts b/typescript/src/agentchat/groupchat/ChatAgentRouter.ts index ee0a476dcbd..bb6a8ec35ae 100644 --- a/typescript/src/agentchat/groupchat/ChatAgentRouter.ts +++ b/typescript/src/agentchat/groupchat/ChatAgentRouter.ts @@ -3,7 +3,7 @@ import { HostableAgentAdapter } from "./HostableAgentAdapter"; import { AgentId, IAgentRuntime } from "../../contracts/IAgentRuntime"; import { MessageContext } from "../../contracts/MessageContext"; import { GroupChatStart, GroupChatAgentResponse, GroupChatReset, GroupChatRequestPublish } from "./Events"; -import { ChatMessage } from "../abstractions/Messages"; +import { ChatMessage, AgentMessage } from "../abstractions/Messages"; /** * Configuration for a chat agent within a group chat. @@ -21,9 +21,11 @@ export interface AgentChatConfig { * Routes group chat events to an IChatAgent implementation. */ export class ChatAgentRouter extends HostableAgentAdapter { - private messageBuffer: ChatMessage[] = []; + // Add a private messages store + private messages: AgentMessage[] = []; private readonly parentTopic: { type: string; source: string }; private readonly outputTopic: { type: string; source: string }; + // Store underlying chat agent private readonly agent: IChatAgent; constructor(agentId: AgentId, runtime: IAgentRuntime, config: AgentChatConfig) { @@ -33,28 +35,71 @@ export class ChatAgentRouter extends HostableAgentAdapter { this.agent = config.chatAgent; } + // Add getter for chatAgent + get chatAgent(): IChatAgent { + return this.agent; + } + async handleAsync(message: unknown, context: MessageContext): Promise { - if (message instanceof GroupChatStart) { - if (message.messages) { - this.messageBuffer.push(...message.messages); - } - } else if (message instanceof GroupChatAgentResponse) { - // Store the response message for future context - if (message.agentResponse.message instanceof ChatMessage) { - this.messageBuffer.push(message.agentResponse.message); - } - } else if (message instanceof GroupChatRequestPublish) { - const response = await this.agent.handleAsync([...this.messageBuffer]); - await this.runtime.publishMessageAsync( - new GroupChatAgentResponse({ agentResponse: response }), - this.parentTopic, - this.id - ); - } else if (message instanceof GroupChatReset) { - this.messageBuffer = []; + console.log('ChatAgentRouter.handleAsync:', { + messageType: message?.constructor.name, + agentName: this.agent.name.toString(), + currentMessages: this.messages.length, + messageDetails: message instanceof ChatMessage ? message.content : undefined + }); + + // Handle reset request + if (message instanceof GroupChatReset) { + this.messages = []; await this.agent.resetAsync(); + console.log('Reset agent state'); + return null; } + + // Handle start request + if (message instanceof GroupChatStart) { + this.messages = message.messages || []; + console.log('Started with messages:', { + count: this.messages.length, + messages: this.messages.map(m => ({ + type: m.constructor.name, + content: 'content' in m ? m.content : undefined + })) + }); + return null; + } + + // Handle publish request + if (message instanceof GroupChatRequestPublish) { + // Use stored messages for agent response + const chatMessages = this.messages.filter((m): m is ChatMessage => m instanceof ChatMessage); + console.log('Processing messages:', { + count: chatMessages.length, + messages: chatMessages.map(m => m.content) + }); + const response = await this.chatAgent.handleAsync(chatMessages); + + if (response.message) { + this.messages.push(response.message); + console.log('Got agent response:', { + type: response.message.constructor.name, + content: 'content' in response.message ? response.message.content : undefined, + totalMessages: this.messages.length + }); + } + + return new GroupChatAgentResponse({ agentResponse: response }); + } + + // For other messages + if (message instanceof ChatMessage) { + console.log('Adding chat message:', { + content: message.content + }); + this.messages.push(message); + } + return null; } } diff --git a/typescript/src/agentchat/groupchat/RoundRobinGroupChat.ts b/typescript/src/agentchat/groupchat/RoundRobinGroupChat.ts index 534023a6d49..b44d1baf0cf 100644 --- a/typescript/src/agentchat/groupchat/RoundRobinGroupChat.ts +++ b/typescript/src/agentchat/groupchat/RoundRobinGroupChat.ts @@ -23,46 +23,74 @@ export class RoundRobinGroupChat extends GroupChatManagerBase implements ITeam { } protected async handleStartAsync(message: GroupChatStart, context: MessageContext): Promise { + console.log('RoundRobinGroupChat.handleStartAsync:', { + hasMessages: !!message.messages, + messageCount: message.messages?.length ?? 0 + }); + + // Reset state + await this.resetAsync(); + + // Add initial messages if (message.messages) { this.messages.push(...message.messages); + console.log('Added initial messages:', { + messages: this.messages.map(m => ({ + type: m.constructor.name, + content: 'content' in m ? m.content : undefined + })) + }); } + // Send initial request to first participant await this.publishNextAsync(); } protected async handleResponseAsync(message: GroupChatAgentResponse, context: MessageContext): Promise { const response = message.agentResponse; - // Add ChatMessage type check - if (!(response.message instanceof ChatMessage)) { - throw new Error("Response message must be a ChatMessage"); + // Add response message to collection + if (response.message) { + this.messages.push(response.message); + console.log('Added response message:', { + type: response.message.constructor.name, + content: 'content' in response.message ? response.message.content : undefined, + totalMessages: this.messages.length + }); } - - this.messages.push(response.message); - if (response.innerMessages) { - this.messages.push(...response.innerMessages); - } - - // Check termination condition before continuing + // Check termination before continuing const stopMessage = await this.checkTerminationConditionAsync(this.messages); if (stopMessage) { + console.log('Got stop message:', { + content: stopMessage.content + }); this.messages.push(stopMessage); return; } + // Continue to next participant await this.publishNextAsync(); + + // Wait for message processing + await new Promise(resolve => setTimeout(resolve, 100)); } private async publishNextAsync(): Promise { - // Get next participant in round-robin order this.lastParticipantIndex = (this.lastParticipantIndex + 1) % this.options.participants.size; const participantEntry = Array.from(this.options.participants.entries())[this.lastParticipantIndex]; + if (!participantEntry) { throw new Error("No participants available"); } const [name, participant] = participantEntry; + console.log('Publishing to next participant:', { + name, + participantIndex: this.lastParticipantIndex, + totalParticipants: this.options.participants.size + }); + await this.publishMessageAsync( new GroupChatRequestPublish(), participant.topicType diff --git a/typescript/test/agentchat/AgentChatSmokeTest.ts b/typescript/test/agentchat/AgentChatSmokeTest.ts index c2a406533f7..90f42783c35 100644 --- a/typescript/test/agentchat/AgentChatSmokeTest.ts +++ b/typescript/test/agentchat/AgentChatSmokeTest.ts @@ -25,7 +25,7 @@ class SpeakMessageAgent extends ChatAgentBase { } get producedMessageTypes(): Array { - return [HandoffMessage]; + return [ChatMessage]; // Change: Produce ChatMessage instead of HandoffMessage } async handleAsync(messages: ChatMessage[]): Promise {