Skip to content

Commit

Permalink
agents tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet committed Feb 24, 2025
1 parent 2f35a82 commit 76a7954
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 30 deletions.
5 changes: 4 additions & 1 deletion typescript/src/contracts/IAgentRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,8 @@ export interface IAgentRuntime {

addSubscriptionAsync(subscription: ISubscriptionDefinition): Promise<void>;

removeSubscriptionAsync(subscriptionId: string): Promise<void>; // Add missing method
removeSubscriptionAsync(subscriptionId: string): Promise<void>;

start(): Promise<void>;
stop(): Promise<void>;
}
108 changes: 88 additions & 20 deletions typescript/src/core/InProcessRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,31 @@ export class InProcessRuntime implements IAgentRuntime {
private agentFactories = new Map<string, (agentId: AgentId, runtime: IAgentRuntime) => Promise<IAgent>>();
private messageDeliveryQueue: MessageDelivery[] = [];
private isRunning = false;
private messageProcessor?: ReturnType<typeof setInterval>;

async start(): Promise<void> {
if (this.isRunning) {
throw new Error("Runtime is already running");
}
this.isRunning = true;
// Start continuous message processing
this.messageProcessor = setInterval(() => {
if (this.messageDeliveryQueue.length > 0) {
this.processNextMessage();
}
}, 10);
}

async stop(): Promise<void> {
if (!this.isRunning) {
throw new Error("Runtime not running");
}
if (this.messageProcessor) {
clearInterval(this.messageProcessor);
this.messageProcessor = undefined;
}
this.isRunning = false;
}

private async publishMessageServicer(envelope: MessageEnvelope, deliveryToken?: AbortSignal): Promise<void> {
if (!envelope.topic) {
Expand All @@ -28,7 +53,8 @@ export class InProcessRuntime implements IAgentRuntime {
topic,
sender,
subscriptionCount: this.subscriptions.size,
deliverToSelf: this.deliverToSelf
deliverToSelf: this.deliverToSelf,
isRunning: this.isRunning
});

for (const [id, subscription] of this.subscriptions.entries()) {
Expand Down Expand Up @@ -87,28 +113,25 @@ export class InProcessRuntime implements IAgentRuntime {
throw new Error("Message must have a receiver to be sent.");
}

console.log('sendMessageServicer:', {
receiver: envelope.receiver,
message: envelope.message,
isRpc: true
});

const context = new MessageContext(envelope.messageId, envelope.cancellation);
context.sender = envelope.sender;
context.isRpc = false;
context.isRpc = true;

const agent = await this.ensureAgentAsync(envelope.receiver);
return await agent.onMessageAsync(envelope.message, context);
}

private async ensureAgentAsync(agentId: AgentId): Promise<IAgent> {
const key = `${agentId.type}:${agentId.key}`;
let agent = this.agentInstances.get(key);

if (!agent) {
const factory = this.agentFactories.get(agentId.type);
if (!factory) {
throw new UndeliverableException(`Agent type ${agentId.type} not found`);
}
agent = await factory(agentId, this);
this.agentInstances.set(key, agent);
}
console.log('Found agent for RPC:', {
agentId: agent.id,
agentType: agent.constructor.name
});

return agent;
const response = await agent.onMessageAsync(envelope.message, context);
console.log('RPC response:', { response });
return response;
}

async publishMessageAsync(
Expand Down Expand Up @@ -136,12 +159,25 @@ export class InProcessRuntime implements IAgentRuntime {
messageId?: string,
cancellation?: AbortSignal
): Promise<unknown> {
if (!this.isRunning) {
throw new Error("Runtime not started");
}

console.log('Sending message:', { message, recipient, sender, isRunning: this.isRunning });

const delivery = new MessageEnvelope(message, messageId, cancellation)
.withSender(sender)
.forSend(recipient, (env, cancel) => this.sendMessageServicer(env, cancel));

this.messageDeliveryQueue.push(delivery);
return delivery.future;

// Process the message immediately instead of waiting for timer
await this.processNextMessage();

// Wait for and return the result
const result = await delivery.future;
console.log('Send completed:', { result });
return result;
}

async getAgentMetadataAsync(agentId: AgentId): Promise<unknown> {
Expand Down Expand Up @@ -171,6 +207,7 @@ export class InProcessRuntime implements IAgentRuntime {
type: AgentType,
factoryFunc: (agentId: AgentId, runtime: IAgentRuntime) => Promise<IAgent>
): Promise<AgentType> {
console.log('Registering agent factory:', { type, existingTypes: Array.from(this.agentFactories.keys()) });
if (this.agentFactories.has(type)) {
throw new Error(`Agent type ${type} already registered`);
}
Expand All @@ -179,9 +216,24 @@ export class InProcessRuntime implements IAgentRuntime {
}

private async processNextMessage(cancellation?: AbortSignal): Promise<void> {
console.log('Processing message:', {
queueLength: this.messageDeliveryQueue.length,
isRunning: this.isRunning
});

if (!this.isRunning) {
console.warn("Attempted to process message when runtime not running");
return;
}

const delivery = this.messageDeliveryQueue.shift();
if (delivery) {
await delivery.invokeAsync(cancellation);
try {
await delivery.invokeAsync(cancellation);
} catch (error) {
console.error("Error processing message:", error);
throw error; // Re-throw to ensure errors propagate
}
}
}

Expand All @@ -198,4 +250,20 @@ export class InProcessRuntime implements IAgentRuntime {
async tryGetAgentProxyAsync(agentId: AgentId): Promise<AgentProxy> {
return new AgentProxy(agentId, this);
}

private async ensureAgentAsync(agentId: AgentId): Promise<IAgent> {
const key = `${agentId.type}:${agentId.key}`;
let agent = this.agentInstances.get(key);

if (!agent) {
const factory = this.agentFactories.get(agentId.type);
if (!factory) {
throw new UndeliverableException(`Agent type ${agentId.type} not found`);
}
agent = await factory(agentId, this);
this.agentInstances.set(key, agent);
}

return agent;
}
}
29 changes: 22 additions & 7 deletions typescript/test/core/Agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import { BaseAgent } from '../../src/core/BaseAgent';
import { AgentId, IAgentRuntime } from '../../src/contracts/IAgentRuntime';
import { MessageContext } from '../../src/contracts/MessageContext';
import { IHandle } from '../../src/contracts/IHandle';
import { TextMessage } from './TestAgent';
import { TextMessage, RpcTextMessage } from './TestAgent';
import { TypeSubscription } from '../../src/core/TypeSubscription';
import { TestAgent, SubscribedAgent } from './TestAgent';

describe('Agent', () => {
it('should not receive messages when not subscribed', async () => {
const runtime = new InProcessRuntime();
await runtime.start();
let agent: TestAgent;

await runtime.registerAgentFactoryAsync("MyAgent", async (id, runtime) => {
Expand All @@ -34,6 +35,7 @@ describe('Agent', () => {

it('should receive messages when subscribed', async () => {
const runtime = new InProcessRuntime();
await runtime.start();
let agent: SubscribedAgent;

await runtime.registerAgentFactoryAsync("MyAgent", async (id, runtime) => {
Expand All @@ -56,26 +58,36 @@ describe('Agent', () => {
// Wait longer for message processing
await new Promise(resolve => setTimeout(resolve, 500));
expect(Object.keys(agent!.ReceivedMessages).length).toBe(1);

await runtime.stop();
}, 15000);

it('should return response for sendMessage', async () => {
const runtime = new InProcessRuntime();
let agent: TestAgent;
await runtime.start();
console.log('Runtime started');

let agent: TestAgent;
await runtime.registerAgentFactoryAsync("MyAgent", async (id, runtime) => {
agent = new TestAgent(id, runtime);
console.log('Created test agent:', { id, agentType: agent.constructor.name });
return agent;
});

const agentId = { type: "MyAgent", key: "default" };
const agentId = { type: "MyAgent", key: "test" };
await runtime.getAgentMetadataAsync(agentId);
console.log('Agent metadata retrieved');

const message: RpcTextMessage = { source: "TestTopic", content: "Request" };
console.log('Sending RPC message:', message);

const message = { source: "TestTopic", content: "Request" };
const response = await runtime.sendMessageAsync(message, agentId);

console.log('Send message response:', response);
console.log('RPC response received:', response);

expect(response).toBe("Request");
}, 30000); // Increase timeout

await runtime.stop();
}, 15000);

it('should handle subscribe and remove subscription correctly', async () => {
class ReceiverAgent extends BaseAgent implements IHandle<string> {
Expand All @@ -93,6 +105,7 @@ describe('Agent', () => {
}

const runtime = new InProcessRuntime();
await runtime.start();
let agent: ReceiverAgent;

await runtime.registerAgentFactoryAsync("MyAgent", async (id, runtime) => {
Expand Down Expand Up @@ -129,5 +142,7 @@ describe('Agent', () => {
await new Promise(resolve => setTimeout(resolve, 500));

expect(agent!.receivedItems.length).toBe(1);

await runtime.stop();
}, 15000);
});
24 changes: 22 additions & 2 deletions typescript/test/core/InProcessRuntime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class ThirdSubscribedAgent extends BaseAgent {
describe('InProcessRuntime', () => {
it('should not deliver to self by default', async () => {
const runtime = new InProcessRuntime();
console.log('Starting runtime...');
await runtime.start(); // Add explicit start like .NET version

let agent: SubscribedSelfPublishAgent | undefined;

// Register and create agent with description
Expand All @@ -44,18 +47,35 @@ describe('InProcessRuntime', () => {
return agent;
});

// Ensure agent is created
console.log('Agent registered, ensuring creation...');
await runtime.getAgentMetadataAsync(agentId);
expect(agent).toBeDefined();
if (!agent) throw new Error("Agent not initialized");

console.log('Agent state before subscription:', {
Text: agent.Text,
agentId: agent.id
});

// Add subscription
await runtime.addSubscriptionAsync(new TypeSubscriptionAttribute("TestTopic").bind("MyAgent"));
const sub = new TypeSubscriptionAttribute("TestTopic").bind("MyAgent");
await runtime.addSubscriptionAsync(sub);
console.log('Added subscription:', {
id: sub.id,
agentType: "MyAgent",
topic: "TestTopic"
});

// Send message that will trigger self-publish
console.log('Sending initial message...');
await runtime.publishMessageAsync("SelfMessage", { type: "TestTopic", source: "test" });
await new Promise(resolve => setTimeout(resolve, 100));

console.log('Final agent state:', {
Text: agent.Text,
defaultText: { source: "DefaultTopic", content: "DefaultContent" }
});

// Verify the text remains default (self-message wasn't delivered)
expect(agent.Text.source).toBe("DefaultTopic");
expect(agent.Text.content).toBe("DefaultContent");
Expand Down

0 comments on commit 76a7954

Please sign in to comment.