diff --git a/typescript/src/agentchat/abstractions/ChatAgent.ts b/typescript/src/agentchat/abstractions/ChatAgent.ts new file mode 100644 index 00000000000..b3e3e8b7b16 --- /dev/null +++ b/typescript/src/agentchat/abstractions/ChatAgent.ts @@ -0,0 +1,105 @@ +import { AgentMessage, ChatMessage } from '../messages/Messages'; + +/** + * A valid name for an agent. + * To ensure parity with Python, we require agent names to be Python identifiers. + */ +export class AgentName { + private static readonly ID_START_CLASS = /[\p{Lu}\p{Ll}\p{Lt}\p{Lm}\p{Lo}\p{Nl}_\u1185-\u1186\u2118\u212E\u309B-\u309C]/u; + private static readonly ID_CONTINUE_CLASS = /[\w\p{Nl}\p{Mc}_\u1185-\u1186\u2118\u212E\u309B-\u309C\u00B7\u0387\u1369-\u1371\u19DA\u200C\u200D\u30FB\uFF65]/u; + private static readonly AGENT_NAME_REGEX = new RegExp(`^${AgentName.ID_START_CLASS.source}${AgentName.ID_CONTINUE_CLASS.source}*$`); + + constructor(private readonly value: string) { + AgentName.checkValid(value); + } + + public static isValid(name: string): boolean { + return AgentName.AGENT_NAME_REGEX.test(name); + } + + public static checkValid(name: string): void { + if (!AgentName.isValid(name)) { + throw new Error(`Agent name '${name}' is not a valid identifier.`); + } + } + + toString(): string { + return this.value; + } +} + +/** + * A response from calling IChatAgent's handleAsync. + */ +export interface Response { + /** + * A chat message produced by the agent as a response. + */ + message: ChatMessage; + + /** + * Inner messages produced by the agent. + */ + innerMessages?: AgentMessage[]; +} + +/** + * Base class for representing a stream of messages interspacing responses and internal processing messages. + * This functions as a discriminated union. + */ +export class StreamingFrame { + public type: 'InternalMessage' | 'Response' = 'Response'; // Set default value + public internalMessage?: TInternalMessage; + public response?: TResponse; +} + +/** + * Base class for representing a stream of messages with internal messages of any AgentMessage subtype. + */ +export class BaseStreamingFrame extends StreamingFrame {} + +/** + * The stream frame for IChatAgent's streamAsync + */ +export class ChatStreamFrame extends BaseStreamingFrame {} + +/** + * An agent that can participate in a chat. + */ +export interface IChatAgent { + /** + * The name of the agent. This is used by team to uniquely identify the agent. + * It should be unique within the team. + */ + readonly name: AgentName; + + /** + * The description of the agent. This is used by team to make decisions about which agents to use. + * The description should describe the agent's capabilities and how to interact with it. + */ + readonly description: string; + + /** + * The types of messages that the agent produces. + */ + readonly producedMessageTypes: Array; + + /** + * Handles chat messages asynchronously and produces a response. + * @param messages The messages to handle + * @returns A promise resolving to the response + */ + handleAsync(messages: ChatMessage[]): Promise; + + /** + * Handles chat messages asynchronously and produces a stream of frames. + * @param messages The messages to handle + * @returns An async iterable of chat stream frames + */ + streamAsync(messages: ChatMessage[]): AsyncIterable; + + /** + * Reset the agent to its initialization state. + */ + resetAsync(): Promise; +} diff --git a/typescript/src/agentchat/abstractions/Events.ts b/typescript/src/agentchat/abstractions/Events.ts new file mode 100644 index 00000000000..afed0c86a50 --- /dev/null +++ b/typescript/src/agentchat/abstractions/Events.ts @@ -0,0 +1,57 @@ +import { AgentMessage, ChatMessage, StopMessage } from "../messages/Messages"; +import { Response } from "./ChatAgent"; + +/** + * Base class for all group chat events. + */ +export abstract class GroupChatEventBase {} + +/** + * A request to start a group chat. + */ +export class GroupChatStart extends GroupChatEventBase { + /** + * An optional list of messages to start the group chat. + */ + messages?: ChatMessage[]; +} + +/** + * A response published to a group chat. + */ +export class GroupChatAgentResponse extends GroupChatEventBase { + /** + * The response from an agent. + */ + agentResponse!: Response; +} + +/** + * A request to publish a message to a group chat. + */ +export class GroupChatRequestPublish extends GroupChatEventBase {} + +/** + * A message from a group chat. + */ +export class GroupChatMessage extends GroupChatEventBase { + /** + * The message that was published. + */ + message!: AgentMessage; +} + +/** + * A message indicating that group chat was terminated. + */ +export class GroupChatTermination extends GroupChatEventBase { + /** + * The stop message that indicates the reason of termination. + */ + message!: StopMessage; +} + +/** + * A request to reset the agents in the group chat. + */ +export class GroupChatReset extends GroupChatEventBase {} diff --git a/typescript/src/agentchat/abstractions/HandlingStream.ts b/typescript/src/agentchat/abstractions/HandlingStream.ts new file mode 100644 index 00000000000..65b17a5627a --- /dev/null +++ b/typescript/src/agentchat/abstractions/HandlingStream.ts @@ -0,0 +1,12 @@ +/** + * Interface for streaming handlers that can process an input type and produce an output type. + */ +export interface IHandleStream { + /** + * Processes the input and produces a stream of output. + * @param input The input to process + * @param cancellation Optional token to cancel the operation + * @returns An async iterable of output items + */ + streamAsync(input: TIn, cancellation?: AbortSignal): AsyncIterable; +} diff --git a/typescript/src/agentchat/abstractions/Handoff.ts b/typescript/src/agentchat/abstractions/Handoff.ts new file mode 100644 index 00000000000..d98863012de --- /dev/null +++ b/typescript/src/agentchat/abstractions/Handoff.ts @@ -0,0 +1,23 @@ +import { AgentMessage } from "./Messages"; + +/** + * Message indicating that control should be handed off to another agent. + */ +export class HandoffMessage extends AgentMessage { + /** + * Gets or sets the target agent to hand off control to. + */ + targetAgent: string; + + /** + * Gets or sets an optional message to send to the target agent. + */ + message?: string; + + constructor(targetAgent: string, message?: string, source?: string) { + super(); + this.targetAgent = targetAgent; + this.message = message; + this.source = source; + } +} diff --git a/typescript/src/agentchat/abstractions/ITeam.ts b/typescript/src/agentchat/abstractions/ITeam.ts new file mode 100644 index 00000000000..ad307238858 --- /dev/null +++ b/typescript/src/agentchat/abstractions/ITeam.ts @@ -0,0 +1,25 @@ +import { TaskFrame } from "./Tasks"; + +/** + * Defines a team of agents that can work together to accomplish tasks. + */ +export interface ITeam { + /** + * Gets a unique identifier for this team. + */ + readonly teamId: string; + + /** + * Executes a task and returns a stream of frames containing intermediate messages and final results. + * @param task The task to execute, typically a string or message + * @param cancellation Optional cancellation token + * @returns An async iterable of task frames containing messages or results + */ + streamAsync(task: string | unknown, cancellation?: AbortSignal): AsyncIterable; + + /** + * Resets the team to its initial state. + * @param cancellation Optional cancellation token + */ + resetAsync(cancellation?: AbortSignal): Promise; +} diff --git a/typescript/src/agentchat/abstractions/Messages.ts b/typescript/src/agentchat/abstractions/Messages.ts new file mode 100644 index 00000000000..055ae2b2176 --- /dev/null +++ b/typescript/src/agentchat/abstractions/Messages.ts @@ -0,0 +1,70 @@ +/** + * Base class for all messages in the agent chat system. + */ +export abstract class AgentMessage { + /** + * Gets or sets the source of the message (e.g., agent name, system, user). + */ + source?: string; +} + +/** + * Represents a basic chat message with text content. + */ +export class ChatMessage extends AgentMessage { + /** + * Gets or sets the text content of the message. + */ + content: string; + + constructor(content: string, source?: string) { + super(); + this.content = content; + this.source = source; + } +} + +/** + * Message indicating that the chat should stop. + */ +export class StopMessage extends AgentMessage { + /** + * Gets or sets the reason for stopping. + */ + content: string; + + constructor(content: string, source?: string) { + super(); + this.content = content; + this.source = source; + } +} + +/** + * Represents metadata about a message exchange. + */ +export class MessageMetadata { + /** + * Gets or sets any model parameters used to generate the message. + */ + modelParameters?: Record; +} + +/** + * Interface for handlers that need to store message metadata. + */ +export interface IStoreMessageMetadata { + /** + * Gets the metadata associated with a message. + * @param messageId The unique identifier of the message. + * @returns The metadata associated with the message. + */ + getMetadata(messageId: string): MessageMetadata | undefined; + + /** + * Associates metadata with a message. + * @param messageId The unique identifier of the message. + * @param metadata The metadata to store. + */ + setMetadata(messageId: string, metadata: MessageMetadata): void; +} diff --git a/typescript/src/agentchat/abstractions/Tasks.ts b/typescript/src/agentchat/abstractions/Tasks.ts new file mode 100644 index 00000000000..e76346b90ae --- /dev/null +++ b/typescript/src/agentchat/abstractions/Tasks.ts @@ -0,0 +1,42 @@ +import { AgentMessage } from "./Messages"; + +/** + * Represents a frame of task execution, containing either a message or result. + */ +export class TaskFrame { + public readonly isResult: boolean; + public readonly message?: AgentMessage; + public readonly result?: TaskResult; + + /** + * Creates a new task frame. + * @param messageOrResult Either an AgentMessage or TaskResult + */ + constructor(messageOrResult: AgentMessage | TaskResult) { + if (messageOrResult instanceof TaskResult) { + this.isResult = true; + this.result = messageOrResult; + } else { + this.isResult = false; + this.message = messageOrResult; + } + } +} + +/** + * Represents the result of a task execution, including all messages generated. + */ +export class TaskResult { + /** + * Gets the messages generated during task execution. + */ + public readonly messages: AgentMessage[]; + + /** + * Creates a new task result. + * @param messages The messages generated during task execution + */ + constructor(messages: AgentMessage[]) { + this.messages = messages; + } +} diff --git a/typescript/src/agentchat/abstractions/Termination.ts b/typescript/src/agentchat/abstractions/Termination.ts new file mode 100644 index 00000000000..57be6dd9054 --- /dev/null +++ b/typescript/src/agentchat/abstractions/Termination.ts @@ -0,0 +1,33 @@ +import { AgentMessage, StopMessage } from "./Messages"; + +/** + * Exception thrown when a chat has already terminated. + */ +export class TerminatedException extends Error { + constructor() { + super("The chat has already terminated."); + this.name = "TerminatedException"; + } +} + +/** + * Defines a condition that determines when a chat should terminate. + */ +export interface ITerminationCondition { + /** + * Gets whether the chat has already terminated. + */ + readonly isTerminated: boolean; + + /** + * Checks if new messages should cause termination of the chat. + * @param messages The messages to check + * @returns A StopMessage if the chat should terminate, null otherwise + */ + checkAndUpdateAsync(messages: AgentMessage[]): Promise; + + /** + * Resets the termination condition to its initial state. + */ + reset(): void; +} diff --git a/typescript/src/agentchat/abstractions/Tools.ts b/typescript/src/agentchat/abstractions/Tools.ts new file mode 100644 index 00000000000..0132855f019 --- /dev/null +++ b/typescript/src/agentchat/abstractions/Tools.ts @@ -0,0 +1,43 @@ +import { ChatMessage } from "./Messages"; + +/** + * Represents a tool that can be used by an agent to perform specific tasks. + */ +export interface ITool { + /** + * Gets the name of the tool. + */ + readonly name: string; + + /** + * Gets the description of what the tool does and how to use it. + */ + readonly description: string; + + /** + * Executes the tool with the given parameters. + * @param parameters The parameters to pass to the tool + * @returns A ChatMessage containing the result of the tool execution + */ + executeAsync(parameters: Record): Promise; +} + +/** + * Represents the result of a tool execution. + */ +export interface ToolResult { + /** + * Gets whether the tool execution was successful. + */ + success: boolean; + + /** + * Gets the message describing the result or error. + */ + message: string; + + /** + * Gets any additional data produced by the tool. + */ + data?: unknown; +} diff --git a/typescript/src/agentchat/agents/ChatAgentBase.ts b/typescript/src/agentchat/agents/ChatAgentBase.ts new file mode 100644 index 00000000000..e120c193112 --- /dev/null +++ b/typescript/src/agentchat/agents/ChatAgentBase.ts @@ -0,0 +1,73 @@ +import { AgentName, IChatAgent, Response, ChatStreamFrame } from "../abstractions/ChatAgent"; +import { ChatMessage } from "../messages/Messages"; + +/** + * Base class for a chat agent. + */ +export abstract class ChatAgentBase implements IChatAgent { + /** + * Gets the name of the agent. This is used by team to uniquely identify the agent. + */ + public readonly name: AgentName; + + /** + * Gets the description of the agent. This is used by team to make decisions about which agents to use. + */ + public readonly description: string; + + /** + * Creates a new instance of ChatAgentBase. + * @param name The name of the agent + * @param description The description of the agent's capabilities + */ + constructor(name: string, description: string) { + this.name = new AgentName(name); + this.description = description; + } + + /** + * Gets the types of messages that the agent produces. + * Must be implemented by derived classes. + */ + abstract get producedMessageTypes(): Array; + + /** + * Handles chat messages asynchronously and produces a stream of frames. + * Default implementation wraps handleAsync with a stream that includes any inner messages. + * @param messages The messages to handle + * @returns An async iterable of chat stream frames + */ + async *streamAsync(messages: ChatMessage[]): AsyncIterableIterator { + const response = await this.handleAsync(messages); + + // First yield any inner messages if present + if (response.innerMessages) { + for (const message of response.innerMessages) { + yield { + type: 'InternalMessage', + internalMessage: message + }; + } + } + + // Then yield the final response + yield { + type: 'Response', + response: response + }; + } + + /** + * Handles chat messages asynchronously and produces a response. + * Must be implemented by derived classes. + * @param messages The messages to handle + * @returns A promise resolving to the response + */ + abstract handleAsync(messages: ChatMessage[]): Promise; + + /** + * Reset the agent to its initialization state. + * Must be implemented by derived classes. + */ + abstract resetAsync(): Promise; +} diff --git a/typescript/src/agentchat/groupchat/ChatAgentRouter.ts b/typescript/src/agentchat/groupchat/ChatAgentRouter.ts new file mode 100644 index 00000000000..ee0a476dcbd --- /dev/null +++ b/typescript/src/agentchat/groupchat/ChatAgentRouter.ts @@ -0,0 +1,60 @@ +import { IChatAgent } from "../abstractions/ChatAgent"; +import { HostableAgentAdapter } from "./HostableAgentAdapter"; +import { AgentId, IAgentRuntime } from "../../contracts/IAgentRuntime"; +import { MessageContext } from "../../contracts/MessageContext"; +import { GroupChatStart, GroupChatAgentResponse, GroupChatReset, GroupChatRequestPublish } from "./Events"; +import { ChatMessage } from "../abstractions/Messages"; + +/** + * Configuration for a chat agent within a group chat. + */ +export interface AgentChatConfig { + /** The topic type to subscribe to */ + readonly parentTopicType: string; + /** The topic type for output */ + readonly outputTopicType: string; + /** The chat agent to route messages to */ + readonly chatAgent: IChatAgent; +} + +/** + * Routes group chat events to an IChatAgent implementation. + */ +export class ChatAgentRouter extends HostableAgentAdapter { + private messageBuffer: ChatMessage[] = []; + private readonly parentTopic: { type: string; source: string }; + private readonly outputTopic: { type: string; source: string }; + private readonly agent: IChatAgent; + + constructor(agentId: AgentId, runtime: IAgentRuntime, config: AgentChatConfig) { + super(agentId, runtime, config.chatAgent.description); + this.parentTopic = { type: config.parentTopicType, source: this.id.key }; + this.outputTopic = { type: config.outputTopicType, source: this.id.key }; + this.agent = config.chatAgent; + } + + async handleAsync(message: unknown, context: MessageContext): Promise { + if (message instanceof GroupChatStart) { + if (message.messages) { + this.messageBuffer.push(...message.messages); + } + } else if (message instanceof GroupChatAgentResponse) { + // Store the response message for future context + if (message.agentResponse.message instanceof ChatMessage) { + this.messageBuffer.push(message.agentResponse.message); + } + } else if (message instanceof GroupChatRequestPublish) { + const response = await this.agent.handleAsync([...this.messageBuffer]); + await this.runtime.publishMessageAsync( + new GroupChatAgentResponse({ agentResponse: response }), + this.parentTopic, + this.id + ); + } else if (message instanceof GroupChatReset) { + this.messageBuffer = []; + await this.agent.resetAsync(); + } + + return null; + } +} diff --git a/typescript/src/agentchat/groupchat/Events.ts b/typescript/src/agentchat/groupchat/Events.ts new file mode 100644 index 00000000000..eb7049a26b6 --- /dev/null +++ b/typescript/src/agentchat/groupchat/Events.ts @@ -0,0 +1,69 @@ +import { AgentMessage, ChatMessage, StopMessage } from "../messages/Messages"; +import { Response } from "../abstractions/ChatAgent"; + +/** + * Base class for all group chat events. + */ +export abstract class GroupChatEventBase {} + +/** + * A request to start a group chat. + */ +export class GroupChatStart extends GroupChatEventBase { + /** + * An optional list of messages to start the group chat. + */ + messages?: ChatMessage[]; + + constructor(options?: { messages?: ChatMessage[] }) { + super(); + if (options?.messages) { + this.messages = options.messages; + } + } +} + +/** + * A response published to a group chat. + */ +export class GroupChatAgentResponse extends GroupChatEventBase { + /** + * The response from an agent. + */ + agentResponse!: Response; + + constructor(options: { agentResponse: Response }) { + super(); + this.agentResponse = options.agentResponse; + } +} + +/** + * A request to publish a message to a group chat. + */ +export class GroupChatRequestPublish extends GroupChatEventBase {} + +/** + * A message from a group chat. + */ +export class GroupChatMessage extends GroupChatEventBase { + /** + * The message that was published. + */ + message!: AgentMessage; +} + +/** + * A message indicating that group chat was terminated. + */ +export class GroupChatTermination extends GroupChatEventBase { + /** + * The stop message that indicates the reason of termination. + */ + message!: StopMessage; +} + +/** + * A request to reset the agents in the group chat. + */ +export class GroupChatReset extends GroupChatEventBase {} diff --git a/typescript/src/agentchat/groupchat/GroupChatBase.ts b/typescript/src/agentchat/groupchat/GroupChatBase.ts new file mode 100644 index 00000000000..3f95578336c --- /dev/null +++ b/typescript/src/agentchat/groupchat/GroupChatBase.ts @@ -0,0 +1,79 @@ +import { ITeam } from "../abstractions/ITeam"; +import { TaskFrame, TaskResult } from "../abstractions/Tasks"; +import { AgentMessage, ChatMessage } from "../abstractions/Messages"; +import { ITerminationCondition } from "../abstractions/Termination"; +import { GroupChatOptions } from "./GroupChatOptions"; +import { GroupChatManagerBase } from "./GroupChatManagerBase"; +import { GroupChatStart, GroupChatRequestPublish } from "./Events"; +import { MessageContext } from "../../contracts/MessageContext"; + +/** + * Base class for implementing group chat functionality. + */ +export abstract class GroupChatBase extends GroupChatManagerBase implements ITeam { + /** + * Gets a unique identifier for this team. + */ + public teamId: string; + + /** + * Creates a new instance of GroupChatBase. + * @param options Configuration options for the group chat + */ + constructor(options: GroupChatOptions) { + super(options); + this.teamId = crypto.randomUUID(); + } + + /** + * Executes a task and returns a stream of frames containing intermediate messages and final results. + * @param task The task to execute, typically a string or message + * @param cancellation Optional cancellation token + */ + async *streamAsync(task: string | unknown, cancellation?: AbortSignal): AsyncIterableIterator { + // Convert task to initial messages if it's a string + const initialMessages = typeof task === 'string' + ? [new ChatMessage(task)] + : task instanceof ChatMessage + ? [task] + : []; + + // Reset state before starting + await this.resetAsync(); + + // Start group chat with initial messages + const context = new MessageContext(crypto.randomUUID(), cancellation); + await this.handleAsync(new GroupChatStart({ messages: initialMessages }), context); + + // Convert messages to frames and yield + yield new TaskFrame(new TaskResult(this.messages)); + } + + /** + * Creates a new GroupChatBase with specified options. + * @param groupChatTopicType Topic type for group chat messages + * @param outputTopicType Topic type for output messages + * @param terminationCondition Optional condition for chat termination + * @param maxTurns Optional maximum number of turns + */ + protected static createBase( + this: new (options: GroupChatOptions) => T, + groupChatTopicType: string, + outputTopicType: string, + terminationCondition?: ITerminationCondition, + maxTurns?: number + ): T { + const options = new GroupChatOptions(groupChatTopicType, outputTopicType); + options.terminationCondition = terminationCondition; + options.maxTurns = maxTurns; + return new this(options); + } + + /** + * Reset the team to its initial state. + * @param cancellation Optional cancellation token + */ + async resetAsync(cancellation?: AbortSignal): Promise { + await super.resetAsync(); + } +} diff --git a/typescript/src/agentchat/groupchat/GroupChatHandlerRouter.ts b/typescript/src/agentchat/groupchat/GroupChatHandlerRouter.ts new file mode 100644 index 00000000000..fb514dcc179 --- /dev/null +++ b/typescript/src/agentchat/groupchat/GroupChatHandlerRouter.ts @@ -0,0 +1,63 @@ +import { IAgentRuntime, AgentId, TopicId } from "../../contracts/IAgentRuntime"; +import { MessageContext } from "../../contracts/MessageContext"; +import { HostableAgentAdapter } from "./HostableAgentAdapter"; +import { GroupChatAgentResponse, GroupChatStart } from "./Events"; +import { IGroupChatHandler } from "./GroupChatManagerBase"; + +/** + * Configuration for a group chat handler. + */ +export interface GroupChatHandlerConfig { + /** + * The topic type to subscribe to + */ + topicType: string; + + /** + * The handler for group chat events + */ + handler: IGroupChatHandler; +} + +/** + * Routes group chat events to appropriate handlers based on topic type. + */ +export class GroupChatHandlerRouter extends HostableAgentAdapter { + private readonly parentTopic: TopicId; + private readonly handler: IGroupChatHandler; + + /** + * Creates a new instance of GroupChatHandlerRouter. + * @param agentContext The context for agent instantiation + * @param config The configuration for this handler + */ + constructor( + agentId: AgentId, + runtime: IAgentRuntime, + config: GroupChatHandlerConfig + ) { + super(agentId, runtime, `Router for ${config.topicType}`); + this.parentTopic = { type: config.topicType, source: this.id.key }; + this.handler = config.handler; + + // Attach the message publisher to the handler + this.handler.attachMessagePublishServicer(async (event, topicType, cancellation) => { + // Updated to match IAgentRuntime.publishMessageAsync signature + await this.runtime.publishMessageAsync( + event, + { type: topicType, source: this.id.key }, + this.id + ); + }); + } + + /** + * Handles incoming messages by routing them to the appropriate handler. + */ + async handleAsync(message: unknown, context: MessageContext): Promise { + if (message instanceof GroupChatStart || message instanceof GroupChatAgentResponse) { + await this.handler.handleAsync(message, context); + } + return null; + } +} diff --git a/typescript/src/agentchat/groupchat/GroupChatManagerBase.ts b/typescript/src/agentchat/groupchat/GroupChatManagerBase.ts new file mode 100644 index 00000000000..9e824d3c99c --- /dev/null +++ b/typescript/src/agentchat/groupchat/GroupChatManagerBase.ts @@ -0,0 +1,115 @@ +import { ChatMessage, StopMessage, AgentMessage } from "../abstractions/Messages"; +import { ITerminationCondition } from "../abstractions/Termination"; +import { GroupChatOptions } from "./GroupChatOptions"; +import { IHandle } from "../../contracts/IHandle"; +import { + GroupChatEventBase, + GroupChatStart, + GroupChatAgentResponse, + GroupChatRequestPublish, + GroupChatMessage, + GroupChatTermination, + GroupChatReset +} from "./Events"; +import { MessageContext } from "../../contracts/MessageContext"; + +/** + * Delegate type for publishing messages. + */ +type MessagePublishServicer = (event: GroupChatEventBase, topicType: string, cancellation?: AbortSignal) => Promise; + +/** + * Interface for handlers that can process group chat events. + */ +export interface IGroupChatHandler { + /** + * Attaches a servicer for publishing messages. + */ + attachMessagePublishServicer(servicer?: MessagePublishServicer): void; + + /** + * Detaches the current message publishing servicer. + */ + detachMessagePublishServicer(): void; + + /** + * Handles group chat messages. + */ + handleAsync(message: GroupChatStart | GroupChatAgentResponse | unknown, context: MessageContext): Promise; +} + +/** + * Base class for managing group chat interactions. + */ +export abstract class GroupChatManagerBase implements IGroupChatHandler { + protected readonly options: GroupChatOptions; + protected messagePublishServicer?: MessagePublishServicer; + protected messages: AgentMessage[] = []; // Changed from ChatMessage[] to AgentMessage[] + private readonly terminationCondition?: ITerminationCondition; + private readonly maxTurns?: number; + private currentTurn: number = 0; + + constructor(options: GroupChatOptions) { + this.options = options; + this.terminationCondition = options.terminationCondition; + this.maxTurns = options.maxTurns; + } + + attachMessagePublishServicer(servicer?: MessagePublishServicer): void { + this.messagePublishServicer = servicer; + } + + detachMessagePublishServicer(): void { + this.messagePublishServicer = undefined; + } + + protected async publishMessageAsync( + event: GroupChatEventBase, + topicType: string, + cancellation?: AbortSignal + ): Promise { + if (this.messagePublishServicer) { + await this.messagePublishServicer(event, topicType, cancellation); + } + } + + /** + * Handles the start of a group chat session. + */ + async handleAsync(message: GroupChatStart | GroupChatAgentResponse | unknown, context: MessageContext): Promise { + if (message instanceof GroupChatStart) { + return this.handleStartAsync(message, context); + } else if (message instanceof GroupChatAgentResponse) { + return this.handleResponseAsync(message, context); + } + throw new Error(`Unhandled message type: ${typeof message}`); + } + + protected abstract handleStartAsync(message: GroupChatStart, context: MessageContext): Promise; + protected abstract handleResponseAsync(message: GroupChatAgentResponse, context: MessageContext): Promise; + + protected async checkTerminationConditionAsync(messages: AgentMessage[]): Promise { + if (this.maxTurns && ++this.currentTurn >= this.maxTurns) { + return new StopMessage("Maximum turns reached", "system"); + } + + if (this.terminationCondition) { + return await this.terminationCondition.checkAndUpdateAsync(messages); + } + + return null; + } + + protected async resetAsync(): Promise { + this.currentTurn = 0; + this.messages = []; + this.terminationCondition?.reset(); + + if (this.messagePublishServicer) { + await this.messagePublishServicer( + new GroupChatReset(), + this.options.groupChatTopicType + ); + } + } +} diff --git a/typescript/src/agentchat/groupchat/GroupChatOptions.ts b/typescript/src/agentchat/groupchat/GroupChatOptions.ts new file mode 100644 index 00000000000..1c1a164124b --- /dev/null +++ b/typescript/src/agentchat/groupchat/GroupChatOptions.ts @@ -0,0 +1,56 @@ +import { ITerminationCondition } from "../abstractions/Termination"; + +/** + * Represents information about a participant in a group chat. + */ +export class GroupParticipant { + /** + * Creates a new group participant. + * @param topicType The topic type for this participant + * @param description Description of the participant's capabilities + */ + constructor( + public readonly topicType: string, + public readonly description: string + ) {} +} + +/** + * Configuration options for a group chat. + */ +export class GroupChatOptions { + /** + * Gets the topic type for group chat messages. + */ + public readonly groupChatTopicType: string; + + /** + * Gets the topic type for output messages. + */ + public readonly outputTopicType: string; + + /** + * Gets or sets the termination condition for the chat. + */ + public terminationCondition?: ITerminationCondition; + + /** + * Gets or sets the maximum number of chat turns. + */ + public maxTurns?: number; + + /** + * Gets the participants in the chat. + */ + public readonly participants: Map = new Map(); + + /** + * Creates a new instance of GroupChatOptions. + * @param groupChatTopicType Topic type for group chat messages + * @param outputTopicType Topic type for output messages + */ + constructor(groupChatTopicType: string, outputTopicType: string) { + this.groupChatTopicType = groupChatTopicType; + this.outputTopicType = outputTopicType; + } +} diff --git a/typescript/src/agentchat/groupchat/HostableAgentAdapter.ts b/typescript/src/agentchat/groupchat/HostableAgentAdapter.ts new file mode 100644 index 00000000000..d2bb4e9c0cf --- /dev/null +++ b/typescript/src/agentchat/groupchat/HostableAgentAdapter.ts @@ -0,0 +1,31 @@ +import { IAgent } from "../../contracts/IAgent"; +import { AgentId } from "../../contracts/IAgentRuntime"; +import { IAgentRuntime } from "../../contracts/IAgentRuntime"; +import { MessageContext } from "../../contracts/MessageContext"; +import { BaseAgent } from "../../core/BaseAgent"; + +/** + * Base class that adapts an agent for hosting in the runtime system. + */ +export abstract class HostableAgentAdapter extends BaseAgent { + /** + * Creates a new instance of the HostableAgentAdapter. + * @param id The unique identifier for this agent + * @param runtime The runtime instance this agent will use + * @param description A brief description of the agent's purpose + */ + constructor(id: AgentId, runtime: IAgentRuntime, description: string) { + super(id, runtime, description); + } + + /** + * Gets metadata associated with the agent. + */ + get metadata() { + return { + type: this.id.type, + key: this.id.key, + description: this.description + }; + } +} diff --git a/typescript/src/agentchat/groupchat/OutputCollectorAgent.ts b/typescript/src/agentchat/groupchat/OutputCollectorAgent.ts new file mode 100644 index 00000000000..d435db8447c --- /dev/null +++ b/typescript/src/agentchat/groupchat/OutputCollectorAgent.ts @@ -0,0 +1,45 @@ +import { AgentId, IAgentRuntime } from "../../contracts/IAgentRuntime"; +import { MessageContext } from "../../contracts/MessageContext"; +import { HostableAgentAdapter } from "./HostableAgentAdapter"; +import { GroupChatMessage } from "./Events"; +import { AgentMessage } from "../abstractions/Messages"; + +/** + * Interface for sinks that can collect output messages. + */ +export interface IOutputCollectionSink { + /** + * Called when a message is collected. + * @param message The collected message + */ + onMessageCollected(message: AgentMessage): Promise; +} + +/** + * Agent that collects output messages from a group chat. + */ +export class OutputCollectorAgent extends HostableAgentAdapter { + private readonly sink: IOutputCollectionSink; + + /** + * Creates a new instance of OutputCollectorAgent. + */ + constructor( + agentId: AgentId, + runtime: IAgentRuntime, + sink: IOutputCollectionSink, + description: string = "Collects output messages from the group chat" + ) { + super(agentId, runtime, description); + this.sink = sink; + } + + /** + * Handles incoming messages by collecting them via the sink. + */ + async handleAsync(message: unknown, context: MessageContext): Promise { + if (message instanceof GroupChatMessage && message.message) { + await this.sink.onMessageCollected(message.message); + } + } +} diff --git a/typescript/src/agentchat/groupchat/RoundRobinGroupChat.ts b/typescript/src/agentchat/groupchat/RoundRobinGroupChat.ts new file mode 100644 index 00000000000..6e98aeac1ca --- /dev/null +++ b/typescript/src/agentchat/groupchat/RoundRobinGroupChat.ts @@ -0,0 +1,88 @@ +import { ITerminationCondition } from "../abstractions/Termination"; +import { GroupChatManagerBase } from "./GroupChatManagerBase"; +import { GroupChatOptions } from "./GroupChatOptions"; +import { GroupChatStart, GroupChatAgentResponse, GroupChatRequestPublish } from "./Events"; +import { MessageContext } from "../../contracts/MessageContext"; +import { ChatMessage } from "../abstractions/Messages"; + +/** + * A group chat implementation that sends messages to participants in round-robin order. + */ +export class RoundRobinGroupChat extends GroupChatManagerBase { + private lastParticipantIndex = -1; + + /** + * Creates a new instance of RoundRobinGroupChat. + * @param options Configuration options for the group chat + */ + constructor(options: GroupChatOptions) { + super(options); + } + + protected async handleStartAsync(message: GroupChatStart, context: MessageContext): Promise { + if (message.messages) { + this.messages.push(...message.messages); + } + + await this.publishNextAsync(); + } + + protected async handleResponseAsync(message: GroupChatAgentResponse, context: MessageContext): Promise { + const response = message.agentResponse; + + // Add ChatMessage type check + if (!(response.message instanceof ChatMessage)) { + throw new Error("Response message must be a ChatMessage"); + } + + this.messages.push(response.message); + + if (response.innerMessages) { + this.messages.push(...response.innerMessages); + } + + // Check termination condition before continuing + const stopMessage = await this.checkTerminationConditionAsync(this.messages); + if (stopMessage) { + this.messages.push(stopMessage); + return; + } + + await this.publishNextAsync(); + } + + private async publishNextAsync(): Promise { + // Get next participant in round-robin order + this.lastParticipantIndex = (this.lastParticipantIndex + 1) % this.options.participants.size; + const participantEntry = Array.from(this.options.participants.entries())[this.lastParticipantIndex]; + if (!participantEntry) { + throw new Error("No participants available"); + } + + const [name, participant] = participantEntry; + await this.publishMessageAsync( + new GroupChatRequestPublish(), + participant.topicType + ); + } + + /** + * Creates a new RoundRobinGroupChat with specified options. + * @param groupChatTopicType Topic type for group chat messages + * @param outputTopicType Topic type for output messages + * @param terminationCondition Optional condition for chat termination + * @param maxTurns Optional maximum number of turns + * @returns A configured RoundRobinGroupChat instance + */ + static create( + groupChatTopicType: string, + outputTopicType: string, + terminationCondition?: ITerminationCondition, + maxTurns?: number + ): RoundRobinGroupChat { + const options = new GroupChatOptions(groupChatTopicType, outputTopicType); + options.terminationCondition = terminationCondition; + options.maxTurns = maxTurns; + return new RoundRobinGroupChat(options); + } +} diff --git a/typescript/src/agentchat/groupchat/TextMentionTermination.ts b/typescript/src/agentchat/groupchat/TextMentionTermination.ts new file mode 100644 index 00000000000..3e1f2618ae3 --- /dev/null +++ b/typescript/src/agentchat/groupchat/TextMentionTermination.ts @@ -0,0 +1,42 @@ +import { ITerminationCondition } from "../abstractions/Termination"; +import { AgentMessage, ChatMessage, StopMessage } from "../abstractions/Messages"; + +/** + * Terminates a conversation when specific text is mentioned in a message. + */ +export class TextMentionTermination implements ITerminationCondition { + private _isTerminated = false; + + /** + * Creates a new instance of TextMentionTermination. + * @param searchText The text to search for in messages + * @param ignoreCase Whether to ignore case when searching + */ + constructor( + private readonly searchText: string, + private readonly ignoreCase: boolean = true + ) {} + + get isTerminated(): boolean { + return this._isTerminated; + } + + async checkAndUpdateAsync(messages: AgentMessage[]): Promise { + // Only check the last message + const lastMessage = messages[messages.length - 1]; + if (lastMessage instanceof ChatMessage) { + const content = this.ignoreCase ? lastMessage.content.toLowerCase() : lastMessage.content; + const searchFor = this.ignoreCase ? this.searchText.toLowerCase() : this.searchText; + + if (content.includes(searchFor)) { + this._isTerminated = true; + return new StopMessage(`Found termination text: ${this.searchText}`, lastMessage.source); + } + } + return null; + } + + reset(): void { + this._isTerminated = false; + } +} diff --git a/typescript/src/agentchat/groupchat/outputs/ConsoleOutputSink.ts b/typescript/src/agentchat/groupchat/outputs/ConsoleOutputSink.ts new file mode 100644 index 00000000000..1002d2b64b1 --- /dev/null +++ b/typescript/src/agentchat/groupchat/outputs/ConsoleOutputSink.ts @@ -0,0 +1,17 @@ +import { AgentMessage, ChatMessage } from "../../abstractions/Messages"; +import { IOutputCollectionSink } from "../OutputCollectorAgent"; + +/** + * A simple output sink that writes messages to the console. + */ +export class ConsoleOutputSink implements IOutputCollectionSink { + /** + * Called when a message is collected. + * @param message The collected message + */ + async onMessageCollected(message: AgentMessage): Promise { + if (message instanceof ChatMessage) { + console.log(`[${message.source ?? 'unknown'}]: ${message.content}`); + } + } +} diff --git a/typescript/src/agentchat/terminations/StopMessageTermination.ts b/typescript/src/agentchat/terminations/StopMessageTermination.ts new file mode 100644 index 00000000000..1aa7c001f2b --- /dev/null +++ b/typescript/src/agentchat/terminations/StopMessageTermination.ts @@ -0,0 +1,36 @@ +import { ITerminationCondition } from "../abstractions/Termination"; +import { AgentMessage, StopMessage } from "../abstractions/Messages"; + +/** + * Terminate a conversation if a StopMessage is received. + */ +export class StopMessageTermination implements ITerminationCondition { + private _isTerminated = false; + + /** + * Gets whether the chat has already terminated. + */ + get isTerminated(): boolean { + return this._isTerminated; + } + + /** + * Checks if new messages should cause termination of the chat. + * @param messages The messages to check + * @returns A StopMessage if a stop message is found, null otherwise + */ + async checkAndUpdateAsync(messages: AgentMessage[]): Promise { + const stopMessage = messages.find(m => m instanceof StopMessage) as StopMessage | undefined; + if (stopMessage) { + this._isTerminated = true; + } + return stopMessage ?? null; + } + + /** + * Resets the termination condition to its initial state. + */ + reset(): void { + this._isTerminated = false; + } +} diff --git a/typescript/src/contracts/KVStringParseHelper.ts b/typescript/src/contracts/KVStringParseHelper.ts new file mode 100644 index 00000000000..0511d451637 --- /dev/null +++ b/typescript/src/contracts/KVStringParseHelper.ts @@ -0,0 +1,32 @@ +/** + * Provides helper methods for parsing key-value string representations. + */ +export class KVStringParseHelper { + /** + * The regular expression pattern used to match key-value pairs in the format "key/value". + */ + private static readonly KV_PAIR_PATTERN = /^(?\w+)\/(?\w+)$/; + + /** + * Parses a string in the format "key/value" into a tuple containing the key and value. + * @param kvString The input string containing a key-value pair + * @param keyName The expected name of the key component + * @param valueName The expected name of the value component + * @returns A tuple containing the extracted key and value + * @throws Error if the input string does not match the expected "key/value" format + * @example + * ```typescript + * const input = "agent1/12345"; + * const result = KVStringParseHelper.toKVPair(input, "Type", "Key"); + * console.log(result[0]); // Outputs: agent1 + * console.log(result[1]); // Outputs: 12345 + * ``` + */ + public static toKVPair(kvString: string, keyName: string, valueName: string): [string, string] { + const match = this.KV_PAIR_PATTERN.exec(kvString); + if (match?.groups) { + return [match.groups['key'], match.groups['value']]; + } + throw new Error(`Invalid key-value pair format: ${kvString}; expecting "${keyName}/${valueName}"`); + } +}