Skip to content

Commit

Permalink
interim trying to fix agentchat
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet committed Feb 24, 2025
1 parent 693587f commit 6bafa25
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 32 deletions.
85 changes: 65 additions & 20 deletions typescript/src/agentchat/groupchat/ChatAgentRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { HostableAgentAdapter } from "./HostableAgentAdapter";
import { AgentId, IAgentRuntime } from "../../contracts/IAgentRuntime";
import { MessageContext } from "../../contracts/MessageContext";
import { GroupChatStart, GroupChatAgentResponse, GroupChatReset, GroupChatRequestPublish } from "./Events";
import { ChatMessage } from "../abstractions/Messages";
import { ChatMessage, AgentMessage } from "../abstractions/Messages";

/**
* Configuration for a chat agent within a group chat.
Expand All @@ -21,9 +21,11 @@ export interface AgentChatConfig {
* Routes group chat events to an IChatAgent implementation.
*/
export class ChatAgentRouter extends HostableAgentAdapter {
private messageBuffer: ChatMessage[] = [];
// Add a private messages store
private messages: AgentMessage[] = [];
private readonly parentTopic: { type: string; source: string };
private readonly outputTopic: { type: string; source: string };
// Store underlying chat agent
private readonly agent: IChatAgent;

constructor(agentId: AgentId, runtime: IAgentRuntime, config: AgentChatConfig) {
Expand All @@ -33,28 +35,71 @@ export class ChatAgentRouter extends HostableAgentAdapter {
this.agent = config.chatAgent;
}

// Add getter for chatAgent
get chatAgent(): IChatAgent {
return this.agent;
}

async handleAsync(message: unknown, context: MessageContext): Promise<unknown> {
if (message instanceof GroupChatStart) {
if (message.messages) {
this.messageBuffer.push(...message.messages);
}
} else if (message instanceof GroupChatAgentResponse) {
// Store the response message for future context
if (message.agentResponse.message instanceof ChatMessage) {
this.messageBuffer.push(message.agentResponse.message);
}
} else if (message instanceof GroupChatRequestPublish) {
const response = await this.agent.handleAsync([...this.messageBuffer]);
await this.runtime.publishMessageAsync(
new GroupChatAgentResponse({ agentResponse: response }),
this.parentTopic,
this.id
);
} else if (message instanceof GroupChatReset) {
this.messageBuffer = [];
console.log('ChatAgentRouter.handleAsync:', {
messageType: message?.constructor.name,
agentName: this.agent.name.toString(),
currentMessages: this.messages.length,
messageDetails: message instanceof ChatMessage ? message.content : undefined
});

// Handle reset request
if (message instanceof GroupChatReset) {
this.messages = [];
await this.agent.resetAsync();
console.log('Reset agent state');
return null;
}

// Handle start request
if (message instanceof GroupChatStart) {
this.messages = message.messages || [];
console.log('Started with messages:', {
count: this.messages.length,
messages: this.messages.map(m => ({
type: m.constructor.name,
content: 'content' in m ? m.content : undefined
}))
});
return null;
}

// Handle publish request
if (message instanceof GroupChatRequestPublish) {
// Use stored messages for agent response
const chatMessages = this.messages.filter((m): m is ChatMessage => m instanceof ChatMessage);
console.log('Processing messages:', {
count: chatMessages.length,
messages: chatMessages.map(m => m.content)
});

const response = await this.chatAgent.handleAsync(chatMessages);

if (response.message) {
this.messages.push(response.message);
console.log('Got agent response:', {
type: response.message.constructor.name,
content: 'content' in response.message ? response.message.content : undefined,
totalMessages: this.messages.length
});
}

return new GroupChatAgentResponse({ agentResponse: response });
}

// For other messages
if (message instanceof ChatMessage) {
console.log('Adding chat message:', {
content: message.content
});
this.messages.push(message);
}

return null;
}
}
50 changes: 39 additions & 11 deletions typescript/src/agentchat/groupchat/RoundRobinGroupChat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,74 @@ export class RoundRobinGroupChat extends GroupChatManagerBase implements ITeam {
}

protected async handleStartAsync(message: GroupChatStart, context: MessageContext): Promise<void> {
console.log('RoundRobinGroupChat.handleStartAsync:', {
hasMessages: !!message.messages,
messageCount: message.messages?.length ?? 0
});

// Reset state
await this.resetAsync();

// Add initial messages
if (message.messages) {
this.messages.push(...message.messages);
console.log('Added initial messages:', {
messages: this.messages.map(m => ({
type: m.constructor.name,
content: 'content' in m ? m.content : undefined
}))
});
}

// Send initial request to first participant
await this.publishNextAsync();
}

protected async handleResponseAsync(message: GroupChatAgentResponse, context: MessageContext): Promise<void> {
const response = message.agentResponse;

// Add ChatMessage type check
if (!(response.message instanceof ChatMessage)) {
throw new Error("Response message must be a ChatMessage");
// Add response message to collection
if (response.message) {
this.messages.push(response.message);
console.log('Added response message:', {
type: response.message.constructor.name,
content: 'content' in response.message ? response.message.content : undefined,
totalMessages: this.messages.length
});
}

this.messages.push(response.message);

if (response.innerMessages) {
this.messages.push(...response.innerMessages);
}

// Check termination condition before continuing
// Check termination before continuing
const stopMessage = await this.checkTerminationConditionAsync(this.messages);
if (stopMessage) {
console.log('Got stop message:', {
content: stopMessage.content
});
this.messages.push(stopMessage);
return;
}

// Continue to next participant
await this.publishNextAsync();

// Wait for message processing
await new Promise(resolve => setTimeout(resolve, 100));
}

private async publishNextAsync(): Promise<void> {
// Get next participant in round-robin order
this.lastParticipantIndex = (this.lastParticipantIndex + 1) % this.options.participants.size;
const participantEntry = Array.from(this.options.participants.entries())[this.lastParticipantIndex];

if (!participantEntry) {
throw new Error("No participants available");
}

const [name, participant] = participantEntry;
console.log('Publishing to next participant:', {
name,
participantIndex: this.lastParticipantIndex,
totalParticipants: this.options.participants.size
});

await this.publishMessageAsync(
new GroupChatRequestPublish(),
participant.topicType
Expand Down
2 changes: 1 addition & 1 deletion typescript/test/agentchat/AgentChatSmokeTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class SpeakMessageAgent extends ChatAgentBase {
}

get producedMessageTypes(): Array<Function> {
return [HandoffMessage];
return [ChatMessage]; // Change: Produce ChatMessage instead of HandoffMessage
}

async handleAsync(messages: ChatMessage[]): Promise<Response> {
Expand Down

0 comments on commit 6bafa25

Please sign in to comment.