diff --git a/typescript/src/contracts/IAgentRuntime.ts b/typescript/src/contracts/IAgentRuntime.ts index cf25f12fd63..0ac7b308a58 100644 --- a/typescript/src/contracts/IAgentRuntime.ts +++ b/typescript/src/contracts/IAgentRuntime.ts @@ -45,5 +45,8 @@ export interface IAgentRuntime { addSubscriptionAsync(subscription: ISubscriptionDefinition): Promise; - removeSubscriptionAsync(subscriptionId: string): Promise; // Add missing method + removeSubscriptionAsync(subscriptionId: string): Promise; + + start(): Promise; + stop(): Promise; } \ No newline at end of file diff --git a/typescript/src/core/InProcessRuntime.ts b/typescript/src/core/InProcessRuntime.ts index 60dd9baa725..ee278d8577a 100644 --- a/typescript/src/core/InProcessRuntime.ts +++ b/typescript/src/core/InProcessRuntime.ts @@ -14,6 +14,31 @@ export class InProcessRuntime implements IAgentRuntime { private agentFactories = new Map Promise>(); private messageDeliveryQueue: MessageDelivery[] = []; private isRunning = false; + private messageProcessor?: ReturnType; + + async start(): Promise { + if (this.isRunning) { + throw new Error("Runtime is already running"); + } + this.isRunning = true; + // Start continuous message processing + this.messageProcessor = setInterval(() => { + if (this.messageDeliveryQueue.length > 0) { + this.processNextMessage(); + } + }, 10); + } + + async stop(): Promise { + if (!this.isRunning) { + throw new Error("Runtime not running"); + } + if (this.messageProcessor) { + clearInterval(this.messageProcessor); + this.messageProcessor = undefined; + } + this.isRunning = false; + } private async publishMessageServicer(envelope: MessageEnvelope, deliveryToken?: AbortSignal): Promise { if (!envelope.topic) { @@ -28,7 +53,8 @@ export class InProcessRuntime implements IAgentRuntime { topic, sender, subscriptionCount: this.subscriptions.size, - deliverToSelf: this.deliverToSelf + deliverToSelf: this.deliverToSelf, + isRunning: this.isRunning }); for (const [id, subscription] of this.subscriptions.entries()) { @@ -87,28 +113,25 @@ export class InProcessRuntime implements IAgentRuntime { throw new Error("Message must have a receiver to be sent."); } + console.log('sendMessageServicer:', { + receiver: envelope.receiver, + message: envelope.message, + isRpc: true + }); + const context = new MessageContext(envelope.messageId, envelope.cancellation); context.sender = envelope.sender; - context.isRpc = false; + context.isRpc = true; const agent = await this.ensureAgentAsync(envelope.receiver); - return await agent.onMessageAsync(envelope.message, context); - } - - private async ensureAgentAsync(agentId: AgentId): Promise { - const key = `${agentId.type}:${agentId.key}`; - let agent = this.agentInstances.get(key); - - if (!agent) { - const factory = this.agentFactories.get(agentId.type); - if (!factory) { - throw new UndeliverableException(`Agent type ${agentId.type} not found`); - } - agent = await factory(agentId, this); - this.agentInstances.set(key, agent); - } + console.log('Found agent for RPC:', { + agentId: agent.id, + agentType: agent.constructor.name + }); - return agent; + const response = await agent.onMessageAsync(envelope.message, context); + console.log('RPC response:', { response }); + return response; } async publishMessageAsync( @@ -136,12 +159,25 @@ export class InProcessRuntime implements IAgentRuntime { messageId?: string, cancellation?: AbortSignal ): Promise { + if (!this.isRunning) { + throw new Error("Runtime not started"); + } + + console.log('Sending message:', { message, recipient, sender, isRunning: this.isRunning }); + const delivery = new MessageEnvelope(message, messageId, cancellation) .withSender(sender) .forSend(recipient, (env, cancel) => this.sendMessageServicer(env, cancel)); this.messageDeliveryQueue.push(delivery); - return delivery.future; + + // Process the message immediately instead of waiting for timer + await this.processNextMessage(); + + // Wait for and return the result + const result = await delivery.future; + console.log('Send completed:', { result }); + return result; } async getAgentMetadataAsync(agentId: AgentId): Promise { @@ -171,6 +207,7 @@ export class InProcessRuntime implements IAgentRuntime { type: AgentType, factoryFunc: (agentId: AgentId, runtime: IAgentRuntime) => Promise ): Promise { + console.log('Registering agent factory:', { type, existingTypes: Array.from(this.agentFactories.keys()) }); if (this.agentFactories.has(type)) { throw new Error(`Agent type ${type} already registered`); } @@ -179,9 +216,24 @@ export class InProcessRuntime implements IAgentRuntime { } private async processNextMessage(cancellation?: AbortSignal): Promise { + console.log('Processing message:', { + queueLength: this.messageDeliveryQueue.length, + isRunning: this.isRunning + }); + + if (!this.isRunning) { + console.warn("Attempted to process message when runtime not running"); + return; + } + const delivery = this.messageDeliveryQueue.shift(); if (delivery) { - await delivery.invokeAsync(cancellation); + try { + await delivery.invokeAsync(cancellation); + } catch (error) { + console.error("Error processing message:", error); + throw error; // Re-throw to ensure errors propagate + } } } @@ -198,4 +250,20 @@ export class InProcessRuntime implements IAgentRuntime { async tryGetAgentProxyAsync(agentId: AgentId): Promise { return new AgentProxy(agentId, this); } + + private async ensureAgentAsync(agentId: AgentId): Promise { + const key = `${agentId.type}:${agentId.key}`; + let agent = this.agentInstances.get(key); + + if (!agent) { + const factory = this.agentFactories.get(agentId.type); + if (!factory) { + throw new UndeliverableException(`Agent type ${agentId.type} not found`); + } + agent = await factory(agentId, this); + this.agentInstances.set(key, agent); + } + + return agent; + } } \ No newline at end of file diff --git a/typescript/test/core/Agent.test.ts b/typescript/test/core/Agent.test.ts index 618f303885e..7cf0b4ee12d 100644 --- a/typescript/test/core/Agent.test.ts +++ b/typescript/test/core/Agent.test.ts @@ -4,13 +4,14 @@ import { BaseAgent } from '../../src/core/BaseAgent'; import { AgentId, IAgentRuntime } from '../../src/contracts/IAgentRuntime'; import { MessageContext } from '../../src/contracts/MessageContext'; import { IHandle } from '../../src/contracts/IHandle'; -import { TextMessage } from './TestAgent'; +import { TextMessage, RpcTextMessage } from './TestAgent'; import { TypeSubscription } from '../../src/core/TypeSubscription'; import { TestAgent, SubscribedAgent } from './TestAgent'; describe('Agent', () => { it('should not receive messages when not subscribed', async () => { const runtime = new InProcessRuntime(); + await runtime.start(); let agent: TestAgent; await runtime.registerAgentFactoryAsync("MyAgent", async (id, runtime) => { @@ -34,6 +35,7 @@ describe('Agent', () => { it('should receive messages when subscribed', async () => { const runtime = new InProcessRuntime(); + await runtime.start(); let agent: SubscribedAgent; await runtime.registerAgentFactoryAsync("MyAgent", async (id, runtime) => { @@ -56,26 +58,36 @@ describe('Agent', () => { // Wait longer for message processing await new Promise(resolve => setTimeout(resolve, 500)); expect(Object.keys(agent!.ReceivedMessages).length).toBe(1); + + await runtime.stop(); }, 15000); it('should return response for sendMessage', async () => { const runtime = new InProcessRuntime(); - let agent: TestAgent; + await runtime.start(); + console.log('Runtime started'); + let agent: TestAgent; await runtime.registerAgentFactoryAsync("MyAgent", async (id, runtime) => { agent = new TestAgent(id, runtime); + console.log('Created test agent:', { id, agentType: agent.constructor.name }); return agent; }); - const agentId = { type: "MyAgent", key: "default" }; + const agentId = { type: "MyAgent", key: "test" }; await runtime.getAgentMetadataAsync(agentId); + console.log('Agent metadata retrieved'); + + const message: RpcTextMessage = { source: "TestTopic", content: "Request" }; + console.log('Sending RPC message:', message); - const message = { source: "TestTopic", content: "Request" }; const response = await runtime.sendMessageAsync(message, agentId); - - console.log('Send message response:', response); + console.log('RPC response received:', response); + expect(response).toBe("Request"); - }, 30000); // Increase timeout + + await runtime.stop(); + }, 15000); it('should handle subscribe and remove subscription correctly', async () => { class ReceiverAgent extends BaseAgent implements IHandle { @@ -93,6 +105,7 @@ describe('Agent', () => { } const runtime = new InProcessRuntime(); + await runtime.start(); let agent: ReceiverAgent; await runtime.registerAgentFactoryAsync("MyAgent", async (id, runtime) => { @@ -129,5 +142,7 @@ describe('Agent', () => { await new Promise(resolve => setTimeout(resolve, 500)); expect(agent!.receivedItems.length).toBe(1); + + await runtime.stop(); }, 15000); }); diff --git a/typescript/test/core/InProcessRuntime.test.ts b/typescript/test/core/InProcessRuntime.test.ts index 2ab2f57c6ec..39f8b2c1b23 100644 --- a/typescript/test/core/InProcessRuntime.test.ts +++ b/typescript/test/core/InProcessRuntime.test.ts @@ -35,6 +35,9 @@ class ThirdSubscribedAgent extends BaseAgent { describe('InProcessRuntime', () => { it('should not deliver to self by default', async () => { const runtime = new InProcessRuntime(); + console.log('Starting runtime...'); + await runtime.start(); // Add explicit start like .NET version + let agent: SubscribedSelfPublishAgent | undefined; // Register and create agent with description @@ -44,18 +47,35 @@ describe('InProcessRuntime', () => { return agent; }); - // Ensure agent is created + console.log('Agent registered, ensuring creation...'); await runtime.getAgentMetadataAsync(agentId); expect(agent).toBeDefined(); if (!agent) throw new Error("Agent not initialized"); + console.log('Agent state before subscription:', { + Text: agent.Text, + agentId: agent.id + }); + // Add subscription - await runtime.addSubscriptionAsync(new TypeSubscriptionAttribute("TestTopic").bind("MyAgent")); + const sub = new TypeSubscriptionAttribute("TestTopic").bind("MyAgent"); + await runtime.addSubscriptionAsync(sub); + console.log('Added subscription:', { + id: sub.id, + agentType: "MyAgent", + topic: "TestTopic" + }); // Send message that will trigger self-publish + console.log('Sending initial message...'); await runtime.publishMessageAsync("SelfMessage", { type: "TestTopic", source: "test" }); await new Promise(resolve => setTimeout(resolve, 100)); + console.log('Final agent state:', { + Text: agent.Text, + defaultText: { source: "DefaultTopic", content: "DefaultContent" } + }); + // Verify the text remains default (self-message wasn't delivered) expect(agent.Text.source).toBe("DefaultTopic"); expect(agent.Text.content).toBe("DefaultContent");