diff --git a/Lapine.Core.IntegrationTests/BrokerProxy.cs b/Lapine.Core.IntegrationTests/BrokerProxy.cs old mode 100755 new mode 100644 diff --git a/Lapine.Core.IntegrationTests/Client/ConnectionTests.cs b/Lapine.Core.IntegrationTests/Client/ConnectionTests.cs old mode 100755 new mode 100644 diff --git a/Lapine.Core/Agents/Agent.cs b/Lapine.Core/Agents/Agent.cs index 12fed88..de086d5 100644 --- a/Lapine.Core/Agents/Agent.cs +++ b/Lapine.Core/Agents/Agent.cs @@ -3,29 +3,24 @@ namespace Lapine.Agents; using System.Runtime.CompilerServices; using System.Threading.Channels; -interface IAgent { - ValueTask PostAsync(Object message, CancellationToken cancellationToken = default); - ValueTask PostAndReplyAsync(Object message); +interface IAgent { + ValueTask PostAsync(TProtocol message, CancellationToken cancellationToken = default); + ValueTask PostAndReplyAsync(Func messageFactory); ValueTask StopAsync(); } -class AsyncReplyChannel { - readonly Action _reply; - - public AsyncReplyChannel(Action reply) => - _reply = reply ?? throw new ArgumentNullException(nameof(reply)); - - public void Reply(Object response) => _reply(response); +class AsyncReplyChannel(Action reply) { + public void Reply(Object response) => reply(response); } -class Agent : IAgent { - readonly Channel _mailbox; +class Agent : IAgent { + readonly Channel _mailbox; readonly Task _messageLoop; - Agent(Channel mailbox, Behaviour initialBehaviour) { + Agent(Channel mailbox, Behaviour initialBehaviour) { _mailbox = mailbox; _messageLoop = Task.Factory.StartNew(async () => { - var context = new MessageContext(this, initialBehaviour, null!); + var context = new MessageContext(this, initialBehaviour, default!); while (await _mailbox.Reader.WaitToReadAsync()) { var message = await _mailbox.Reader.ReadAsync(); @@ -34,21 +29,24 @@ class Agent : IAgent { }); } - static public IAgent StartNew(Behaviour initialBehaviour) { - var mailbox = Channel.CreateUnbounded(new UnboundedChannelOptions { + static public IAgent StartNew(Behaviour initialBehaviour) { + var mailbox = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); - return new Agent(mailbox, initialBehaviour); + return new Agent(mailbox, initialBehaviour); } - public async ValueTask PostAsync(Object message, CancellationToken cancellationToken = default) => + public async ValueTask PostAsync(TProtocol message, CancellationToken cancellationToken = default) => await _mailbox.Writer.WriteAsync(message, cancellationToken); - public async ValueTask PostAndReplyAsync(Object message) { + public async ValueTask PostAndReplyAsync(Func messageFactory) { + ArgumentNullException.ThrowIfNull(messageFactory); + var promise = AsyncValueTaskMethodBuilder.Create(); var replyChannel = new AsyncReplyChannel(reply => promise.SetResult(reply)); + var message = messageFactory(replyChannel); - await _mailbox.Writer.WriteAsync((message, replyChannel)); + await _mailbox.Writer.WriteAsync(message); return await promise.Task; } diff --git a/Lapine.Core/Agents/AmqpClientAgent.cs b/Lapine.Core/Agents/AmqpClientAgent.cs index e84bff1..7701232 100644 --- a/Lapine.Core/Agents/AmqpClientAgent.cs +++ b/Lapine.Core/Agents/AmqpClientAgent.cs @@ -6,27 +6,32 @@ namespace Lapine.Agents; using Lapine.Client; using Lapine.Protocol; -using static Lapine.Agents.ChannelAgent.Protocol; -using static Lapine.Agents.DispatcherAgent.Protocol; -using static Lapine.Agents.HandshakeAgent.Protocol; -using static Lapine.Agents.HeartbeatAgent.Protocol; -using static Lapine.Agents.AmqpClientAgent.Protocol; -using static Lapine.Agents.SocketAgent.Protocol; - -static class AmqpClientAgent { - static public class Protocol { - public record EstablishConnection(ConnectionConfiguration Configuration, CancellationToken CancellationToken = default); - public record OpenChannel(CancellationToken CancellationToken = default); - public record Disconnect; - } - - static public IAgent Create() => - Agent.StartNew(Disconnected()); - - static Behaviour Disconnected() => +interface IAmqpClientAgent { + Task EstablishConnection(ConnectionConfiguration configuration, CancellationToken cancellationToken = default); + Task OpenChannel(CancellationToken cancellationToken = default); + Task Disconnect(); + Task Stop(); +} + +class AmqpClientAgent : IAmqpClientAgent { + readonly IAgent _agent; + + AmqpClientAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); + + abstract record Protocol; + record EstablishConnection(ConnectionConfiguration Configuration, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record OpenChannel(AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record Disconnect(AsyncReplyChannel ReplyChannel) : Protocol; + record HeartbeatEventEventReceived(Object Message) : Protocol; + + static public IAmqpClientAgent Create() => + new AmqpClientAgent(Agent.StartNew(Disconnected())); + + static Behaviour Disconnected() => async context => { switch (context.Message) { - case (EstablishConnection(var connectionConfiguration, var cancellationToken), AsyncReplyChannel replyChannel): { + case EstablishConnection(var connectionConfiguration, var replyChannel, var cancellationToken): { var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(connectionConfiguration.ConnectionTimeout); @@ -43,7 +48,7 @@ static Behaviour Disconnected() => var endpoint = remainingEndpoints.Dequeue(); var socketAgent = SocketAgent.Create(); - switch (await socketAgent.PostAndReplyAsync(new Connect(endpoint, cts.Token))) { + switch (await socketAgent.ConnectAsync(endpoint, cts.Token)) { case ConnectionFailed(var fault) when remainingEndpoints.Any(): { accumulatedFailures.Add(fault); continue; @@ -55,7 +60,7 @@ static Behaviour Disconnected() => } case Connected(var connectionEvents, var receivedFrames): { var dispatcher = DispatcherAgent.Create(); - await dispatcher.PostAsync(new DispatchTo(socketAgent, 0)); + await dispatcher.DispatchTo(socketAgent, 0); var handshakeAgent = HandshakeAgent.Create( receivedFrames : receivedFrames, @@ -64,30 +69,22 @@ static Behaviour Disconnected() => cancellationToken: cts.Token ); - switch (await handshakeAgent.PostAndReplyAsync(new StartHandshake(connectionConfiguration))) { - case ConnectionAgreement connectionAgreement: { - await socketAgent.PostAsync(new Tune(connectionAgreement.MaxFrameSize)); + switch (await handshakeAgent.StartHandshake(connectionConfiguration)) { + case HandshakeAgent.ConnectionAgreed(var connectionAgreement): { + await socketAgent.Tune(connectionAgreement.MaxFrameSize); var heartbeatAgent = HeartbeatAgent.Create(); if (connectionConfiguration.ConnectionIntegrityStrategy.HeartbeatFrequency.HasValue) { - var heartbeatEvents = (IObservable) await heartbeatAgent.PostAndReplyAsync(new StartHeartbeat( - ReceivedFrames: receivedFrames, - Dispatcher : dispatcher, - Frequency : connectionConfiguration.ConnectionIntegrityStrategy.HeartbeatFrequency.Value - )); - heartbeatEvents.Subscribe(onNext: message => context.Self.PostAndReplyAsync(message)); + var heartbeatEvents = await heartbeatAgent.Start(receivedFrames, dispatcher, connectionConfiguration.ConnectionIntegrityStrategy.HeartbeatFrequency.Value); + heartbeatEvents.Subscribe(onNext: message => context.Self.PostAsync(new HeartbeatEventEventReceived(message))); } // If tcp keepalives are enabled, configure the socket... if (connectionConfiguration.ConnectionIntegrityStrategy.KeepAliveSettings.HasValue) { var (probeTime, retryInterval, retryCount) = connectionConfiguration.ConnectionIntegrityStrategy.KeepAliveSettings.Value; - await socketAgent.PostAsync(new EnableTcpKeepAlives( - ProbeTime : probeTime, - RetryInterval: retryInterval, - RetryCount : retryCount - ), cts.Token); + await socketAgent.EnableTcpKeepAlives(probeTime, retryInterval, retryCount); } replyChannel.Reply(true); @@ -106,7 +103,7 @@ await socketAgent.PostAsync(new EnableTcpKeepAlives( ) }; } - case Exception fault: { + case HandshakeAgent.HandshakeFailed(var fault): { replyChannel.Reply(fault); return context; } @@ -118,7 +115,7 @@ await socketAgent.PostAsync(new EnableTcpKeepAlives( } return context; } - case (Protocol.Disconnect, AsyncReplyChannel replyChannel): { + case Disconnect(var replyChannel): { replyChannel.Reply(true); return context; } @@ -126,43 +123,35 @@ await socketAgent.PostAsync(new EnableTcpKeepAlives( } }; - static Behaviour Connected(ConnectionConfiguration connectionConfiguration, IAgent socketAgent, IAgent heartbeatAgent, IObservable receivedFrames, IObservable connectionEvents, IAgent dispatcher, IImmutableList availableChannelIds) => + static Behaviour Connected(ConnectionConfiguration connectionConfiguration, ISocketAgent socketAgent, IHeartbeatAgent heartbeatAgent, IObservable receivedFrames, IObservable connectionEvents, IDispatcherAgent dispatcher, IImmutableList availableChannelIds) => async context => { switch (context.Message) { - case RemoteFlatline: { - await heartbeatAgent.StopAsync(); - await dispatcher.StopAsync(); - await socketAgent.PostAsync(new SocketAgent.Protocol.Disconnect()); + case HeartbeatEventEventReceived(RemoteFlatline): { + await heartbeatAgent.Stop(); + await dispatcher.Stop(); + await socketAgent.Disconnect(); await socketAgent.StopAsync(); return context with { Behaviour = Disconnected() }; } - case (AmqpClientAgent.Protocol.Disconnect, AsyncReplyChannel replyChannel): { - await heartbeatAgent.StopAsync(); - await dispatcher.StopAsync(); - await socketAgent.PostAsync(new SocketAgent.Protocol.Disconnect()); + case Disconnect(var replyChannel): { + await heartbeatAgent.Stop(); + await dispatcher.Stop(); + await socketAgent.Disconnect(); await socketAgent.StopAsync(); replyChannel.Reply(true); return context; } - case (OpenChannel(var cancellationToken), AsyncReplyChannel replyChannel): { + case OpenChannel(var replyChannel, var cancellationToken): { var channelId = availableChannelIds[0]; var channelAgent = ChannelAgent.Create(connectionConfiguration.MaximumFrameSize); using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(connectionConfiguration.CommandTimeout); - var command = new Open( - ChannelId : channelId, - ReceivedFrames : receivedFrames.Where(frame => frame.Channel == channelId), - ConnectionEvents : connectionEvents, - SocketAgent : socketAgent, - CancellationToken: cts.Token - ); - - switch (await channelAgent.PostAndReplyAsync(command)) { + switch (await channelAgent.Open(channelId,receivedFrames.Where(frame => frame.Channel == channelId), connectionEvents, socketAgent, cts.Token)) { case true: { replyChannel.Reply(channelAgent); return context with { @@ -180,4 +169,16 @@ static Behaviour Connected(ConnectionConfiguration connectionConfiguration, IAge default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Connected)}' behaviour."); } }; + + async Task IAmqpClientAgent.EstablishConnection(ConnectionConfiguration configuration, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new EstablishConnection(configuration, replyChannel, cancellationToken)); + + async Task IAmqpClientAgent.OpenChannel(CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new OpenChannel(replyChannel, cancellationToken)); + + async Task IAmqpClientAgent.Disconnect() => + await _agent.PostAndReplyAsync(replyChannel => new Disconnect(replyChannel)); + + async Task IAmqpClientAgent.Stop() => + await _agent.StopAsync(); } diff --git a/Lapine.Core/Agents/Behaviour.cs b/Lapine.Core/Agents/Behaviour.cs index 6d3be2c..45c9d7f 100644 --- a/Lapine.Core/Agents/Behaviour.cs +++ b/Lapine.Core/Agents/Behaviour.cs @@ -1,3 +1,3 @@ namespace Lapine.Agents; -delegate ValueTask Behaviour(MessageContext context); +delegate ValueTask> Behaviour(MessageContext context); diff --git a/Lapine.Core/Agents/ChannelAgent.cs b/Lapine.Core/Agents/ChannelAgent.cs index 20690a3..a5613e9 100644 --- a/Lapine.Core/Agents/ChannelAgent.cs +++ b/Lapine.Core/Agents/ChannelAgent.cs @@ -5,71 +5,89 @@ namespace Lapine.Agents; using Lapine.Protocol; using Lapine.Protocol.Commands; -using static Lapine.Agents.DispatcherAgent.Protocol; -using static Lapine.Agents.ChannelAgent.Protocol; -using static Lapine.Agents.ConsumerAgent.Protocol; -using static Lapine.Agents.PublishAgent.Protocol; - -static class ChannelAgent { - static public class Protocol { - public record Open(UInt16 ChannelId, IObservable ReceivedFrames, IObservable ConnectionEvents, IAgent SocketAgent, CancellationToken CancellationToken = default); - public record Close(CancellationToken CancellationToken = default); - public record DeclareExchange(ExchangeDefinition Definition, CancellationToken CancellationToken = default); - public record DeleteExchange(String Exchange, DeleteExchangeCondition Condition, CancellationToken CancellationToken = default); - public record DeclareQueue(QueueDefinition Definition, CancellationToken CancellationToken = default); - public record DeleteQueue(String Queue, DeleteQueueCondition Condition, CancellationToken CancellationToken = default); - public record BindQueue(Binding Binding, CancellationToken CancellationToken = default); - public record UnbindQueue(Binding Binding, CancellationToken CancellationToken = default); - public record PurgeQueue(String Queue, CancellationToken CancellationToken = default); - public record Publish( - String Exchange, - String RoutingKey, - RoutingFlags RoutingFlags, - (BasicProperties Properties, ReadOnlyMemory Body) Message, - CancellationToken CancellationToken - ); - public record GetMessage( - String Queue, - Acknowledgements Acknowledgements, - CancellationToken CancellationToken = default - ); - public record Acknowledge(UInt64 DeliveryTag, Boolean Multiple); - public record Reject(UInt64 DeliveryTag, Boolean Requeue); - public record SetPrefetchLimit(UInt16 Limit, Boolean Global, CancellationToken CancellationToken = default); - public record Consume( - String Queue, - ConsumerConfiguration ConsumerConfiguration, - IReadOnlyDictionary? Arguments, - CancellationToken CancellationToken = default - ); - public record EnablePublisherConfirms(CancellationToken CancellationToken = default); - } - - static public IAgent Create(UInt32 maxFrameSize) => - Agent.StartNew(Closed(maxFrameSize)); - - static Behaviour Closed(UInt32 maxFrameSize) => +interface IChannelAgent { + Task Open(UInt16 channelId, IObservable frameStream, IObservable connectionEvents, ISocketAgent socketAgent, CancellationToken cancellationToken = default); + Task Close(CancellationToken cancellationToken = default); + Task DeclareExchange(ExchangeDefinition definition, CancellationToken cancellationToken = default); + Task DeleteExchange(String exchange, DeleteExchangeCondition condition, CancellationToken cancellationToken = default); + Task DeclareQueue(QueueDefinition definition, CancellationToken cancellationToken = default); + Task DeleteQueue(String queue, DeleteQueueCondition condition, CancellationToken cancellationToken = default); + Task BindQueue(Binding binding, CancellationToken cancellationToken = default); + Task UnbindQueue(Binding binding, CancellationToken cancellationToken = default); + Task PurgeQueue(String queue, CancellationToken cancellationToken = default); + Task Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties, ReadOnlyMemory) message, CancellationToken cancellationToken = default); + Task GetMessage(String queue, Acknowledgements acknowledgements, CancellationToken cancellationToken = default); + + Task Acknowledge(UInt64 deliveryTag, Boolean multiple); + Task Reject(UInt64 deliveryTag, Boolean requeue); + Task SetPrefetchLimit(UInt16 limit, Boolean global, CancellationToken cancellationToken = default); + Task Consume(String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments, CancellationToken cancellationToken = default); + Task EnablePublisherConfirms(CancellationToken cancellationToken = default); +} + +class ChannelAgent : IChannelAgent { + readonly IAgent _agent; + + ChannelAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); + + abstract record Protocol; + record Open(UInt16 ChannelId, IObservable ReceivedFrames, IObservable ConnectionEvents, ISocketAgent SocketAgent, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record Close(AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record DeclareExchange(ExchangeDefinition Definition, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record DeleteExchange(String Exchange, DeleteExchangeCondition Condition, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record DeclareQueue(QueueDefinition Definition, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record DeleteQueue(String Queue, DeleteQueueCondition Condition, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record BindQueue(Binding Binding, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record UnbindQueue(Binding Binding, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record PurgeQueue(String Queue, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record Publish( + String Exchange, + String RoutingKey, + RoutingFlags RoutingFlags, + (BasicProperties Properties, ReadOnlyMemory Body) Message, + AsyncReplyChannel ReplyChannel, + CancellationToken CancellationToken + ) : Protocol; + record GetMessage( + String Queue, + Acknowledgements Acknowledgements, + AsyncReplyChannel ReplyChannel, + CancellationToken CancellationToken = default + ) : Protocol; + record Acknowledge(UInt64 DeliveryTag, Boolean Multiple, AsyncReplyChannel ReplyChannel) : Protocol; + record Reject(UInt64 DeliveryTag, Boolean Requeue, AsyncReplyChannel ReplyChannel) : Protocol; + record SetPrefetchLimit(UInt16 Limit, Boolean Global, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record Consume( + String Queue, + ConsumerConfiguration ConsumerConfiguration, + IReadOnlyDictionary? Arguments, + AsyncReplyChannel ReplyChannel, + CancellationToken CancellationToken = default + ) : Protocol; + record EnablePublisherConfirms(AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + + static public IChannelAgent Create(UInt32 maxFrameSize) => + new ChannelAgent(Agent.StartNew(Closed(maxFrameSize))); + + static Behaviour Closed(UInt32 maxFrameSize) => async context => { switch (context.Message) { - case (Open(var channelId, var receivedFrames, var connectionEvents, var socketAgent, var cancellationToken), AsyncReplyChannel replyChannel): { + case Open(var channelId, var receivedFrames, var connectionEvents, var socketAgent, var replyChannel, var cancellationToken): { var dispatcher = DispatcherAgent.Create(); - var consumers = ImmutableDictionary.Empty; - await dispatcher.PostAsync(new DispatchTo(socketAgent, channelId)); + var consumers = ImmutableDictionary.Empty; + await dispatcher.DispatchTo(socketAgent, channelId); - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); - switch (await processManager.PostAndReplyAsync(new ChannelOpen())) { - case ChannelOpenOk: { + switch (await processManager.Request(new ChannelOpen())) { + case Result.Ok: { replyChannel.Reply(true); - return context with { Behaviour = Open(maxFrameSize, receivedFrames, dispatcher, consumers) }; + return context with { Behaviour = OpenBehaviour(maxFrameSize, receivedFrames, dispatcher, consumers) }; } - case Exception fault: { - replyChannel.Reply(fault); - break; + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); + return context; } } break; @@ -79,35 +97,27 @@ static Behaviour Closed(UInt32 maxFrameSize) => return context; }; - static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, IAgent dispatcher, IImmutableDictionary consumers, UInt64 deliveryTag = 1, Boolean enablePublisherConfirms = false) => + static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable receivedFrames, IDispatcherAgent dispatcher, IImmutableDictionary consumers, UInt64 deliveryTag = 1, Boolean enablePublisherConfirms = false) => async context => { switch (context.Message) { - case (Close(var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case Close(var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); - switch (await processManager.PostAndReplyAsync(new ChannelClose(0, String.Empty, (0, 0)))) { - case ChannelCloseOk: { + switch (await processManager.Request(new ChannelClose(0, String.Empty, (0, 0)))) { + case Result.Ok: { replyChannel.Reply(true); await context.Self.StopAsync(); break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } return context; } - case (DeclareExchange(var exchange, var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case DeclareExchange(var exchange, var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); var command = new ExchangeDeclare( ExchangeName: exchange.Name, @@ -120,24 +130,20 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, Arguments : exchange.Arguments ); - switch (await processManager.PostAndReplyAsync(command)) { - case ExchangeDeclareOk: { + switch (await processManager.Request(command)) { + case Result.Ok: { replyChannel.Reply(true); break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } return context; } - case (DeleteExchange(var name, var condition, var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case DeleteExchange(var name, var condition, var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); var command = new ExchangeDelete( ExchangeName: name, @@ -145,24 +151,20 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, NoWait : false ); - switch (await processManager.PostAndReplyAsync(command)) { - case ExchangeDeleteOk: { + switch (await processManager.Request(command)) { + case Result.Ok: { replyChannel.Reply(true); break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } return context; } - case (DeclareQueue(var queue, var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case DeclareQueue(var queue, var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); var command = new QueueDeclare( QueueName : queue.Name, @@ -174,24 +176,20 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, Arguments : queue.Arguments ); - switch (await processManager.PostAndReplyAsync(command)) { - case QueueDeclareOk: { + switch (await processManager.Request(command)) { + case Result.Ok: { replyChannel.Reply(true); break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } return context; } - case (DeleteQueue(var name, var condition, var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case DeleteQueue(var name, var condition, var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); var command = new QueueDelete( QueueName: name, @@ -200,24 +198,20 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, NoWait : false ); - switch (await processManager.PostAndReplyAsync(command)) { - case QueueDeleteOk: { + switch (await processManager.Request(command)) { + case Result.Ok: { replyChannel.Reply(true); break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } return context; } - case (BindQueue(var binding, var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case BindQueue(var binding, var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); var command = new QueueBind( QueueName : binding.Queue, @@ -227,24 +221,20 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, Arguments : binding.Arguments ); - switch (await processManager.PostAndReplyAsync(command)) { - case QueueBindOk: { + switch (await processManager.Request(command)) { + case Result.Ok: { replyChannel.Reply(true); break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } return context; } - case (UnbindQueue(var binding, var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case UnbindQueue(var binding, var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); var command = new QueueUnbind( QueueName : binding.Queue, @@ -253,38 +243,34 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, Arguments : binding.Arguments ); - switch (await processManager.PostAndReplyAsync(command)) { - case QueueUnbindOk: { + switch (await processManager.Request(command)) { + case Result.Ok: { replyChannel.Reply(true); break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } return context; } - case (PurgeQueue(var name, var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case PurgeQueue(var name, var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); - switch (await processManager.PostAndReplyAsync(new QueuePurge(name, NoWait: false))) { - case QueuePurgeOk: { - replyChannel.Reply(true); + switch (await processManager.Request(new QueuePurge(name, NoWait: false))) { + case Result.Ok(var queuePurgeOk): { + replyChannel.Reply(queuePurgeOk.MessageCount); break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } return context; } - case (Publish(var exchange, var routingKey, var routingFlags, var message, var cancellationToken), AsyncReplyChannel replyChannel): { + case Publish(var exchange, var routingKey, var routingFlags, var message, var replyChannel, var cancellationToken): { var publishAgent = PublishAgent.Create( receivedFrames : receivedFrames, dispatcher : dispatcher, @@ -294,10 +280,8 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, cancellationToken : cancellationToken ); - var command = new PublishMessage(exchange, routingKey, routingFlags, message); - - switch (await publishAgent.PostAndReplyAsync(command)) { - case true: { + switch (await publishAgent.Publish(exchange, routingKey, routingFlags, message)) { + case Result.Ok: { replyChannel.Reply(true); if (enablePublisherConfirms) { deliveryTag += 1; @@ -305,67 +289,57 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, break; } - case Exception fault: { + case Result.Fault(var fault): { replyChannel.Reply(fault); break; } } return context; } - case (GetMessage(var queue, var acknowledgements, var cancellationToken), AsyncReplyChannel replyChannel): { + case GetMessage(var queue, var acknowledgements, var replyChannel, var cancellationToken): { var processManager = GetMessageAgent.Create( receivedFrames : receivedFrames, dispatcher : dispatcher, cancellationToken: cancellationToken ); - var command = new GetMessageAgent.Protocol.GetMessages(queue, acknowledgements); - - switch (await processManager.PostAndReplyAsync(command)) - { - case GetMessageAgent.Protocol.NoMessages: { - replyChannel.Reply(null); - break; - } - case (DeliveryInfo deliveryInfo, BasicProperties properties, ReadOnlyMemory body): { - replyChannel.Reply((deliveryInfo, properties, body)); - break; - } - case Exception fault: { - replyChannel.Reply(fault); - break; + try { + switch (await processManager.GetMessages(queue, acknowledgements)) { + case GetMessageAgent.NoMessage: { + replyChannel.Reply(null); + break; + } + case GetMessageAgent.Message(var deliveryInfo, var properties, var body): { + replyChannel.Reply((deliveryInfo, properties, body)); + break; + } } - }; + } + catch (Exception fault) { + replyChannel.Reply(fault); + } return context; } - case (Acknowledge(var deliveryTag, var multiple), AsyncReplyChannel replyChannel): { - var command = new BasicAck( + case Acknowledge(var deliveryTag, var multiple, var replyChannel): { + await dispatcher.Dispatch(new BasicAck( DeliveryTag: deliveryTag, Multiple : multiple - ); - - await dispatcher.PostAsync(Dispatch.Command(command)); + )); replyChannel.Reply(true); return context; } - case (Reject(var deliveryTag, var requeue), AsyncReplyChannel replyChannel): { - var command = new BasicReject( + case Reject(var deliveryTag, var requeue, var replyChannel): { + await dispatcher.Dispatch(new BasicReject( DeliveryTag: deliveryTag, ReQueue : requeue - ); - - await dispatcher.PostAsync(Dispatch.Command(command)); + )); replyChannel.Reply(true); return context; } - case (SetPrefetchLimit(var limit, var global, var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case SetPrefetchLimit(var limit, var global, var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); var command = new BasicQos( PrefetchSize : 0, @@ -373,36 +347,27 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, Global : global ); - switch (await processManager.PostAndReplyAsync(command)) { - case BasicQosOk: { + switch (await processManager.Request(command)) { + case Result.Ok: { replyChannel.Reply(true); break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } return context; } - case (Consume(var queue, var configuration, var arguments, var cancellationToken), AsyncReplyChannel replyChannel): { + case Consume(var queue, var configuration, var arguments, var replyChannel, var cancellationToken): { var consumerTag = $"{Guid.NewGuid()}"; var consumer = ConsumerAgent.Create(); - var command = new StartConsuming( - ConsumerTag : consumerTag, - ReceivedFrames : receivedFrames, - Dispatcher : dispatcher, - Queue : queue, - ConsumerConfiguration: configuration, - Arguments : arguments - ); - - switch (await consumer.PostAndReplyAsync(command)) { + switch (await consumer.StartConsuming(consumerTag, receivedFrames, dispatcher, queue, configuration, arguments)) { case true: { replyChannel.Reply(consumerTag); return context with { - Behaviour = Open(maxFrameSize, receivedFrames, dispatcher, consumers.Add(consumerTag, consumer)) + Behaviour = OpenBehaviour(maxFrameSize, receivedFrames, dispatcher, consumers.Add(consumerTag, consumer)) }; } case Exception fault: { @@ -412,21 +377,17 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, } return context; } - case (EnablePublisherConfirms(var cancellationToken), AsyncReplyChannel replyChannel): { - var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, - cancellationToken: cancellationToken - ); + case EnablePublisherConfirms(var replyChannel, var cancellationToken): { + var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); - switch (await processManager.PostAndReplyAsync(new ConfirmSelect(NoWait: false))) { - case ConfirmSelectOk: { + switch (await processManager.Request(new ConfirmSelect(NoWait: false))) { + case Result.Ok: { replyChannel.Reply(true); enablePublisherConfirms = true; break; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + replyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } @@ -435,4 +396,52 @@ static Behaviour Open(UInt32 maxFrameSize, IObservable receivedFrames, default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Open)}' behaviour."); } }; + + async Task IChannelAgent.Open(UInt16 channelId, IObservable frameStream, IObservable connectionEvents, ISocketAgent socketAgent, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new Open(channelId, frameStream, connectionEvents, socketAgent, replyChannel, cancellationToken)); + + async Task IChannelAgent.Close(CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new Close(replyChannel, cancellationToken)); + + async Task IChannelAgent.DeclareExchange(ExchangeDefinition definition, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new DeclareExchange(definition, replyChannel, cancellationToken)); + + async Task IChannelAgent.DeleteExchange(String exchange, DeleteExchangeCondition condition, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new DeleteExchange(exchange, condition, replyChannel, cancellationToken)); + + async Task IChannelAgent.DeclareQueue(QueueDefinition definition, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new DeclareQueue(definition, replyChannel, cancellationToken)); + + async Task IChannelAgent.DeleteQueue(String queue, DeleteQueueCondition condition, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new DeleteQueue(queue, condition, replyChannel, cancellationToken)); + + async Task IChannelAgent.BindQueue(Binding binding, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new BindQueue(binding, replyChannel, cancellationToken)); + + async Task IChannelAgent.UnbindQueue(Binding binding, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new UnbindQueue(binding, replyChannel, cancellationToken)); + + async Task IChannelAgent.PurgeQueue(String queue, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new PurgeQueue(queue, replyChannel, cancellationToken)); + + async Task IChannelAgent.Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties, ReadOnlyMemory) message, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new Publish(exchange, routingKey, routingFlags, message, replyChannel, cancellationToken)); + + async Task IChannelAgent.GetMessage(String queue, Acknowledgements acknowledgements, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new GetMessage(queue, acknowledgements, replyChannel, cancellationToken)); + + async Task IChannelAgent.Acknowledge(UInt64 deliveryTag, Boolean multiple) => + await _agent.PostAndReplyAsync(replyChannel => new Acknowledge(deliveryTag, multiple, replyChannel)); + + async Task IChannelAgent.Reject(UInt64 deliveryTag, Boolean requeue) => + await _agent.PostAndReplyAsync(replyChannel => new Reject(deliveryTag, requeue, replyChannel)); + + async Task IChannelAgent.SetPrefetchLimit(UInt16 limit, Boolean global, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new SetPrefetchLimit(limit, global, replyChannel, cancellationToken)); + + async Task IChannelAgent.Consume(String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments, CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new Consume(queue, consumerConfiguration, arguments, replyChannel, cancellationToken)); + + async Task IChannelAgent.EnablePublisherConfirms(CancellationToken cancellationToken) => + await _agent.PostAndReplyAsync(replyChannel => new EnablePublisherConfirms(replyChannel, cancellationToken)); } diff --git a/Lapine.Core/Agents/ConsumerAgent.cs b/Lapine.Core/Agents/ConsumerAgent.cs index ca37e30..396a25a 100644 --- a/Lapine.Core/Agents/ConsumerAgent.cs +++ b/Lapine.Core/Agents/ConsumerAgent.cs @@ -5,30 +5,38 @@ namespace Lapine.Agents; using Lapine.Protocol; using Lapine.Protocol.Commands; -using static Lapine.Agents.ConsumerAgent.Protocol; -using static Lapine.Agents.MessageAssemblerAgent.Protocol; -using static Lapine.Agents.MessageHandlerAgent.Protocol; - -static class ConsumerAgent { - static public class Protocol { - public record StartConsuming( - String ConsumerTag, - IObservable ReceivedFrames, - IAgent Dispatcher, - String Queue, - ConsumerConfiguration ConsumerConfiguration, - IReadOnlyDictionary? Arguments - ); - public record ConsumeMessage( - DeliveryInfo Delivery, - BasicProperties Properties, - MemoryBufferWriter Buffer - ); - public record Stop; - } - - static public IAgent Create() => - Agent.StartNew(Unstarted()); +interface IConsumerAgent { + Task StartConsuming(String consumerTag, IObservable frameStream, IDispatcherAgent dispatcherAgent, String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments = null); + Task HandlerReady(IMessageHandlerAgent handler); +} + +class ConsumerAgent : IConsumerAgent { + readonly IAgent _agent; + + ConsumerAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); + + abstract record Protocol; + record StartConsuming( + IConsumerAgent Self, + String ConsumerTag, + IObservable ReceivedFrames, + IDispatcherAgent Dispatcher, + String Queue, + ConsumerConfiguration ConsumerConfiguration, + IReadOnlyDictionary? Arguments, + AsyncReplyChannel ReplyChannel + ) : Protocol; + record ConsumeMessage( + DeliveryInfo Delivery, + BasicProperties Properties, + MemoryBufferWriter Buffer + ) : Protocol; + record HandlerReady(IMessageHandlerAgent Handler) : Protocol; + record Stop : Protocol; + + static public IConsumerAgent Create() => + new ConsumerAgent(Agent.StartNew(Unstarted())); readonly record struct Message( DeliveryInfo DeliveryInfo, @@ -36,22 +44,13 @@ readonly record struct Message( MemoryBufferWriter Body ); - static Behaviour Unstarted() => + static Behaviour Unstarted() => async context => { switch (context.Message) { - case (StartConsuming start, AsyncReplyChannel replyChannel): { - var handlers = Enumerable.Range(0, start.ConsumerConfiguration.MaxDegreeOfParallelism) - .Select(_ => MessageHandlerAgent.Create(context.Self)) - .ToImmutableQueue(); - var assembler = MessageAssemblerAgent.StartNew(); - await assembler.PostAsync(new Begin(start.ReceivedFrames, context.Self)); - var processManager = RequestReplyAgent.StartNew( - receivedFrames : start.ReceivedFrames, - dispatcher : start.Dispatcher, - cancellationToken: CancellationToken.None - ); + case StartConsuming start: { + var processManager = RequestReplyAgent.StartNew(start.ReceivedFrames, start.Dispatcher, CancellationToken.None); - var command = new BasicConsume( + var basicConsume = new BasicConsume( QueueName : start.Queue, ConsumerTag: start.ConsumerTag, NoLocal : false, @@ -64,27 +63,33 @@ static Behaviour Unstarted() => Arguments: start.Arguments ?? ImmutableDictionary.Empty ); - switch (await processManager.PostAndReplyAsync(command)) { - case BasicConsumeOk: { - replyChannel.Reply(true); + switch (await processManager.Request(basicConsume)) { + case Result.Ok(var basicConsumeOk): { + var handlers = Enumerable.Range(0, start.ConsumerConfiguration.MaxDegreeOfParallelism) + .Select(_ => MessageHandlerAgent.Create(start.Self)) + .ToImmutableQueue(); + var assembler = MessageAssemblerAgent.StartNew(); + var receivedMessages = await assembler.Begin(start.ReceivedFrames, start.Self); + receivedMessages.Subscribe(async message => await context.Self.PostAsync(new ConsumeMessage(message.DeliveryInfo, message.Properties, message.Buffer))); + + start.ReplyChannel.Reply(true); return context with { Behaviour = Running( - consumerTag : start.ConsumerTag, + consumerTag : basicConsumeOk.ConsumerTag, receivedFrames : start.ReceivedFrames, dispatcher : start.Dispatcher, assembler : assembler, consumerConfiguration: start.ConsumerConfiguration, availableHandlers : handlers, - busyHandlers : ImmutableList.Empty, + busyHandlers : ImmutableList.Empty, inbox : ImmutableQueue.Empty ) }; } - case Exception fault: { - replyChannel.Reply(fault); + case Result.Fault(var exceptionDispatchInfo): { + start.ReplyChannel.Reply(exceptionDispatchInfo.SourceException); break; } } - break; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Unstarted)}' behaviour."); @@ -92,18 +97,12 @@ static Behaviour Unstarted() => return context; }; - static Behaviour Running(String consumerTag, IObservable receivedFrames, IAgent dispatcher, IAgent assembler, ConsumerConfiguration consumerConfiguration, IImmutableQueue availableHandlers, IImmutableList busyHandlers, IImmutableQueue inbox) => + static Behaviour Running(String consumerTag, IObservable receivedFrames, IDispatcherAgent dispatcher, IMessageAssemblerAgent assembler, ConsumerConfiguration consumerConfiguration, IImmutableQueue availableHandlers, IImmutableList busyHandlers, IImmutableQueue inbox) => async context => { switch (context.Message) { case ConsumeMessage(var deliveryInfo, var properties, var buffer) when availableHandlers.Any(): { availableHandlers = availableHandlers.Dequeue(out var handler); - await handler.PostAsync(new HandleMessage( - Dispatcher : dispatcher, - ConsumerConfiguration: consumerConfiguration, - Delivery : deliveryInfo, - Properties : properties, - Buffer : buffer - )); + await handler.HandleMessage(dispatcher, consumerConfiguration, deliveryInfo, properties, buffer); return context with { Behaviour = Running( consumerTag : consumerTag, @@ -147,13 +146,7 @@ await handler.PostAsync(new HandleMessage( case HandlerReady(var handler) when inbox.Any(): { inbox = inbox.Dequeue(out var message); - await handler.PostAsync(new HandleMessage( - Dispatcher : dispatcher, - ConsumerConfiguration: consumerConfiguration, - Delivery : message.DeliveryInfo, - Properties : message.Properties, - Buffer : message.Body - )); + await handler.HandleMessage(dispatcher, consumerConfiguration, message.DeliveryInfo, message.Properties, message.Body); return context with { Behaviour = Running( consumerTag : consumerTag, @@ -166,30 +159,40 @@ await handler.PostAsync(new HandleMessage( inbox : inbox ) }; } - case Protocol.Stop: { - var processManager = RequestReplyAgent.StartNew( + case Stop: { + var processManager = RequestReplyAgent.StartNew( receivedFrames : receivedFrames, dispatcher : dispatcher, cancellationToken: CancellationToken.None ); - switch (await processManager.PostAndReplyAsync(new BasicCancel(consumerTag, false))) { - case BasicCancelOk: { + switch (await processManager.Request(new BasicCancel(consumerTag, false))) { + case Result.Ok(var basicCancelOk): { foreach (var handlerAgent in availableHandlers) - await handlerAgent.StopAsync(); + await handlerAgent.Stop(); foreach (var handlerAgent in busyHandlers) - await handlerAgent.StopAsync(); + await handlerAgent.Stop(); - await assembler.PostAsync(new MessageAssemblerAgent.Protocol.Stop()); + await assembler.Stop(); await context.Self.StopAsync(); return context; } + case Result.Fault(var exceptionDispatchInfo): { + // TODO + break; + } } return context; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Running)}' behaviour."); } }; + + async Task IConsumerAgent.StartConsuming(String consumerTag, IObservable frameStream, IDispatcherAgent dispatcherAgent, String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments) => + await _agent.PostAndReplyAsync(replyChannel => new StartConsuming(this, consumerTag, frameStream, dispatcherAgent, queue, consumerConfiguration, arguments, replyChannel)); + + async Task IConsumerAgent.HandlerReady(IMessageHandlerAgent handler) => + await _agent.PostAsync(new HandlerReady(handler)); } diff --git a/Lapine.Core/Agents/DispatcherAgent.cs b/Lapine.Core/Agents/DispatcherAgent.cs index 24cf04d..e5301bb 100644 --- a/Lapine.Core/Agents/DispatcherAgent.cs +++ b/Lapine.Core/Agents/DispatcherAgent.cs @@ -3,34 +3,30 @@ namespace Lapine.Agents; using Lapine.Protocol; using Lapine.Protocol.Commands; -using static Lapine.Agents.DispatcherAgent.Protocol; -using static Lapine.Agents.SocketAgent.Protocol; - -static class DispatcherAgent { - static public class Protocol { - public record DispatchTo(IAgent SocketAgent, UInt16 ChannelId); - public record Dispatch(Object Entity) { - static public Dispatch ProtocolHeader(ProtocolHeader protocolHeader) => - new (protocolHeader); - - static public Dispatch Frame(RawFrame frame) => - new (frame); +interface IDispatcherAgent { + Task DispatchTo(ISocketAgent socketAgent, UInt16 channelId); + Task Dispatch(ProtocolHeader protocolHeader); + Task Dispatch(RawFrame frame); + Task Dispatch(ICommand command); + Task Dispatch(ContentHeader header); + Task Dispatch(ReadOnlyMemory body); + Task Stop(); +} - static public Dispatch Command(ICommand command) => - new (command); +class DispatcherAgent : IDispatcherAgent { + readonly IAgent _agent; - static public Dispatch ContentHeader(ContentHeader header) => - new (header); + DispatcherAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); - static public Dispatch ContentBody(ReadOnlyMemory body) => - new (body); - } - } + abstract record Protocol; + record DispatchTo(ISocketAgent SocketAgent, UInt16 ChannelId) : Protocol; + record Dispatch(Object Entity) : Protocol; - static public IAgent Create() => - Agent.StartNew(Ready); + static public IDispatcherAgent Create() => + new DispatcherAgent(Agent.StartNew(Ready)); - static async ValueTask Ready(MessageContext context) { + static async ValueTask> Ready(MessageContext context) { switch (context.Message) { case DispatchTo(var socketAgent, var channelId): { return context with { Behaviour = Dispatching(channelId, socketAgent) }; @@ -39,33 +35,54 @@ static async ValueTask Ready(MessageContext context) { } } - static Behaviour Dispatching(UInt16 channelId, IAgent socketAgent) => + static Behaviour Dispatching(UInt16 channelId, ISocketAgent socketAgent) => async context => { switch (context.Message) { case Dispatch { Entity: ProtocolHeader header }: { - await socketAgent.PostAsync(new Transmit(header)); + await socketAgent.Transmit(header); return context; } case Dispatch { Entity: RawFrame frame }: { - await socketAgent.PostAsync(new Transmit(frame)); + await socketAgent.Transmit(frame); return context; } case Dispatch { Entity: ICommand command }: { var frame = RawFrame.Wrap(in channelId, command); - await socketAgent.PostAsync(new Transmit(frame)); + await socketAgent.Transmit(frame); return context; } case Dispatch { Entity: ContentHeader contentHeader }: { var frame = RawFrame.Wrap(in channelId, contentHeader); - await socketAgent.PostAsync(new Transmit(frame)); + await socketAgent.Transmit(frame); return context; } case Dispatch { Entity: ReadOnlyMemory body }: { var frame = RawFrame.Wrap(in channelId, body.Span); - await socketAgent.PostAsync(new Transmit(frame)); + await socketAgent.Transmit(frame); return context; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Dispatching)}' behaviour."); } }; + + async Task IDispatcherAgent.DispatchTo(ISocketAgent socketAgent, UInt16 channelId) => + await _agent.PostAsync(new DispatchTo(socketAgent, channelId)); + + async Task IDispatcherAgent.Dispatch(ProtocolHeader protocolHeader) => + await _agent.PostAsync(new Dispatch(protocolHeader)); + + async Task IDispatcherAgent.Dispatch(RawFrame frame) => + await _agent.PostAsync(new Dispatch(frame)); + + async Task IDispatcherAgent.Dispatch(ICommand command) => + await _agent.PostAsync(new Dispatch(command)); + + async Task IDispatcherAgent.Dispatch(ContentHeader header) => + await _agent.PostAsync(new Dispatch(header)); + + async Task IDispatcherAgent.Dispatch(ReadOnlyMemory body) => + await _agent.PostAsync(new Dispatch(body)); + + async Task IDispatcherAgent.Stop() => + await _agent.StopAsync(); } diff --git a/Lapine.Core/Agents/GetMessageAgent.cs b/Lapine.Core/Agents/GetMessageAgent.cs index b70d3ad..c3c7ba4 100644 --- a/Lapine.Core/Agents/GetMessageAgent.cs +++ b/Lapine.Core/Agents/GetMessageAgent.cs @@ -4,36 +4,46 @@ namespace Lapine.Agents; using Lapine.Protocol; using Lapine.Protocol.Commands; -using static Lapine.Agents.DispatcherAgent.Protocol; -using static Lapine.Agents.GetMessageAgent.Protocol; +interface IGetMessageAgent { + Task GetMessages(String queue, Acknowledgements acknowledgements); +} -static class GetMessageAgent { - static public class Protocol { - public record GetMessages(String Queue, Acknowledgements Acknowledgements); - public record NoMessages; - } +class GetMessageAgent : IGetMessageAgent { + readonly IAgent _agent; + + GetMessageAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); + + public abstract record GetMessageResult; + public record NoMessage : GetMessageResult; + public record Message(DeliveryInfo DeliveryInfo, BasicProperties Properties, ReadOnlyMemory Body) : GetMessageResult; + + abstract record Protocol; + record GetMessage(String Queue, Acknowledgements Acknowledgements, AsyncReplyChannel ReplyChannel) : Protocol; + record FrameReceived(Object Frame) : Protocol; + record Timeout : Protocol; - static public IAgent Create(IObservable receivedFrames, IAgent dispatcher, CancellationToken cancellationToken) => - Agent.StartNew(AwaitingGetMessages(receivedFrames, dispatcher, cancellationToken)); + static public IGetMessageAgent Create(IObservable receivedFrames, IDispatcherAgent dispatcher, CancellationToken cancellationToken) => + new GetMessageAgent(Agent.StartNew(AwaitingGetMessages(receivedFrames, dispatcher, cancellationToken))); - static Behaviour AwaitingGetMessages(IObservable receivedFrames, IAgent dispatcher, CancellationToken cancellationToken) => + static Behaviour AwaitingGetMessages(IObservable receivedFrames, IDispatcherAgent dispatcher, CancellationToken cancellationToken) => async context => { switch (context.Message) { - case (GetMessages(var queue, var acknowledgements), AsyncReplyChannel replyChannel): { + case GetMessage(var queue, var acknowledgements, var replyChannel): { var subscription = receivedFrames.Subscribe(onNext: frame => { - context.Self.PostAsync(RawFrame.Unwrap(frame)); + context.Self.PostAsync(new FrameReceived(RawFrame.Unwrap(frame))); }); - await dispatcher.PostAsync(Dispatch.Command(new BasicGet( + await dispatcher.Dispatch(new BasicGet( QueueName: queue, NoAck: acknowledgements switch { Acknowledgements.Auto => true, Acknowledgements.Manual => false, _ => false } - ))); + )); - var cancelTimeout = cancellationToken.Register(() => context.Self.PostAsync(new TimeoutException())); + var cancelTimeout = cancellationToken.Register(() => context.Self.PostAsync(new Timeout())); return context with { Behaviour = AwaitingBasicGetOkOrEmpty(subscription, cancelTimeout, replyChannel) @@ -43,29 +53,29 @@ await dispatcher.PostAsync(Dispatch.Command(new BasicGet( } }; - static Behaviour AwaitingBasicGetOkOrEmpty(IDisposable subscription, CancellationTokenRegistration cancelTimeout, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingBasicGetOkOrEmpty(IDisposable subscription, CancellationTokenRegistration cancelTimeout, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { - case BasicGetEmpty: { + case FrameReceived(BasicGetEmpty): { await cancelTimeout.DisposeAsync(); - replyChannel.Reply(new NoMessages()); + replyChannel.Reply(new NoMessage()); await context.Self.StopAsync(); subscription.Dispose(); return context; } - case BasicGetOk ok: { + case FrameReceived(BasicGetOk ok): { var deliveryInfo = DeliveryInfo.FromBasicGetOk(ok); return context with { Behaviour = AwaitingContentHeader(subscription, cancelTimeout, deliveryInfo, replyChannel) }; } - case TimeoutException timeout: { - replyChannel.Reply(timeout); + case Timeout: { + replyChannel.Reply(new TimeoutException()); await context.Self.StopAsync(); subscription.Dispose(); return context; } - case ChannelClose close: { + case FrameReceived(ChannelClose close): { await cancelTimeout.DisposeAsync(); replyChannel.Reply(AmqpException.Create(close.ReplyCode, close.ReplyText)); await context.Self.StopAsync(); @@ -76,28 +86,28 @@ static Behaviour AwaitingBasicGetOkOrEmpty(IDisposable subscription, Cancellatio } }; - static Behaviour AwaitingContentHeader(IDisposable subscription, CancellationTokenRegistration cancelTimeout, DeliveryInfo deliveryInfo, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingContentHeader(IDisposable subscription, CancellationTokenRegistration cancelTimeout, DeliveryInfo deliveryInfo, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { - case ContentHeader { BodySize: 0 } header: { + case FrameReceived(ContentHeader { BodySize: 0 } header): { await cancelTimeout.DisposeAsync(); - replyChannel.Reply((deliveryInfo, header.Properties, Memory.Empty)); + replyChannel.Reply(new Message(deliveryInfo, header.Properties, Memory.Empty)); await context.Self.StopAsync(); subscription.Dispose(); return context; } - case ContentHeader { BodySize: > 0 } header: { + case FrameReceived(ContentHeader { BodySize: > 0 } header): { return context with { Behaviour = AwaitingContentBody(subscription, cancelTimeout, deliveryInfo, header, replyChannel) }; } - case TimeoutException timeout: { + case FrameReceived(TimeoutException timeout): { replyChannel.Reply(timeout); await context.Self.StopAsync(); subscription.Dispose(); return context; } - case ChannelClose close: { + case FrameReceived(ChannelClose close): { await cancelTimeout.DisposeAsync(); replyChannel.Reply(AmqpException.Create(close.ReplyCode, close.ReplyText)); await context.Self.StopAsync(); @@ -108,23 +118,23 @@ static Behaviour AwaitingContentHeader(IDisposable subscription, CancellationTok } }; - static Behaviour AwaitingContentBody(IDisposable subscription, CancellationTokenRegistration cancelTimeout, DeliveryInfo deliveryInfo, ContentHeader header, AsyncReplyChannel replyChannel) { + static Behaviour AwaitingContentBody(IDisposable subscription, CancellationTokenRegistration cancelTimeout, DeliveryInfo deliveryInfo, ContentHeader header, AsyncReplyChannel replyChannel) { var buffer = new MemoryBufferWriter((Int32)header.BodySize); return async context => { switch (context.Message) { - case ReadOnlyMemory segment: { + case FrameReceived(ReadOnlyMemory segment): { buffer.WriteBytes(segment.Span); if ((UInt64)buffer.WrittenCount >= header.BodySize) { await cancelTimeout.DisposeAsync(); - replyChannel.Reply((deliveryInfo, header.Properties, buffer.WrittenMemory)); + replyChannel.Reply(new Message(deliveryInfo, header.Properties, buffer.WrittenMemory)); await context.Self.StopAsync(); subscription.Dispose(); } return context; } - case TimeoutException timeout: { - replyChannel.Reply(timeout); + case Timeout: { + replyChannel.Reply(new TimeoutException()); await context.Self.StopAsync(); subscription.Dispose(); return context; @@ -133,4 +143,15 @@ static Behaviour AwaitingContentBody(IDisposable subscription, CancellationToken } }; } + + async Task IGetMessageAgent.GetMessages(String queue, Acknowledgements acknowledgements) { + switch (await _agent.PostAndReplyAsync(replyChannel => new GetMessage(queue, acknowledgements, replyChannel))) { + case GetMessageResult reply: + return reply; + case Exception fault: + throw fault; + default: + throw new Exception("Unexpected return value"); + } + } } diff --git a/Lapine.Core/Agents/HandshakeAgent.cs b/Lapine.Core/Agents/HandshakeAgent.cs index 33d62e2..f3eaec3 100644 --- a/Lapine.Core/Agents/HandshakeAgent.cs +++ b/Lapine.Core/Agents/HandshakeAgent.cs @@ -7,32 +7,44 @@ namespace Lapine.Agents; using static System.Math; using static System.Text.Encoding; -using static Lapine.Agents.DispatcherAgent.Protocol; -using static Lapine.Agents.HandshakeAgent.Protocol; -using static Lapine.Agents.SocketAgent.Protocol; -static class HandshakeAgent { - static public class Protocol { - public record StartHandshake(ConnectionConfiguration ConnectionConfiguration); - } +interface IHandshakeAgent { + Task StartHandshake(ConnectionConfiguration connectionConfiguration); +} + +class HandshakeAgent : IHandshakeAgent { + readonly IAgent _agent; + + HandshakeAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); + + public abstract record HandshakeResult; + public record ConnectionAgreed(ConnectionAgreement Agreement) : HandshakeResult; + public record HandshakeFailed(Exception Fault) : HandshakeResult; - static public IAgent Create(IObservable receivedFrames, IObservable connectionEvents, IAgent dispatcher, CancellationToken cancellationToken) => - Agent.StartNew(Unstarted(receivedFrames, connectionEvents, dispatcher, cancellationToken)); + abstract record Protocol; + record StartHandshake(ConnectionConfiguration ConnectionConfiguration, AsyncReplyChannel ReplyChannel) : Protocol; + record FrameReceived(ICommand Frame) : Protocol; + record Timeout(TimeoutException Exception) : Protocol; + record ConnectionEventReceived(Object Message) : Protocol; - static Behaviour Unstarted(IObservable receivedFrames, IObservable connectionEvents, IAgent dispatcher, CancellationToken cancellationToken) => + static public IHandshakeAgent Create(IObservable receivedFrames, IObservable connectionEvents, IDispatcherAgent dispatcher, CancellationToken cancellationToken) => + new HandshakeAgent(Agent.StartNew(Unstarted(receivedFrames, connectionEvents, dispatcher, cancellationToken))); + + static Behaviour Unstarted(IObservable receivedFrames, IObservable connectionEvents, IDispatcherAgent dispatcher, CancellationToken cancellationToken) => async context => { switch (context.Message) { - case (StartHandshake(var connectionConfiguration), AsyncReplyChannel replyChannel): { + case StartHandshake(var connectionConfiguration, var replyChannel): { var framesSubscription = receivedFrames .Where(frame => frame.Channel == 0) .Where(frame => frame.Type == FrameType.Method) - .Subscribe(frame => context.Self.PostAsync(RawFrame.UnwrapMethod(frame))); + .Subscribe(frame => context.Self.PostAsync(new FrameReceived(RawFrame.UnwrapMethod(frame)))); - var connectionEventsSubscription = connectionEvents.Subscribe(message => context.Self.PostAsync(message)); + var connectionEventsSubscription = connectionEvents.Subscribe(message => context.Self.PostAsync(new ConnectionEventReceived(message))); - var scheduledTimeout = cancellationToken.Register(() => context.Self.PostAsync(new TimeoutException("A connection to the broker was established but the negotiation did not complete within the specified connection timeout limit."))); + var scheduledTimeout = cancellationToken.Register(() => context.Self.PostAsync(new Timeout(new TimeoutException("A connection to the broker was established but the negotiation did not complete within the specified connection timeout limit.")))); - await dispatcher.PostAsync(Dispatch.ProtocolHeader(ProtocolHeader.Default)); + await dispatcher.Dispatch(ProtocolHeader.Default); return context with { Behaviour = AwaitingConnectionStart(connectionConfiguration, scheduledTimeout, framesSubscription, connectionEventsSubscription, dispatcher, replyChannel) @@ -42,43 +54,43 @@ static Behaviour Unstarted(IObservable receivedFrames, IObservable + static Behaviour AwaitingConnectionStart(ConnectionConfiguration connectionConfiguration, CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventsSubscription, IDispatcherAgent dispatcher, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { - case ConnectionStart message when !message.Mechanisms.Contains(connectionConfiguration.AuthenticationStrategy.Mechanism): { + case FrameReceived(ConnectionStart message) when !message.Mechanisms.Contains(connectionConfiguration.AuthenticationStrategy.Mechanism): { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new Exception($"Requested authentication mechanism '{connectionConfiguration.AuthenticationStrategy.Mechanism}' is not supported by the broker. This broker supports {String.Join(", ", message.Mechanisms)}")); + replyChannel.Reply(new HandshakeFailed(new Exception($"Requested authentication mechanism '{connectionConfiguration.AuthenticationStrategy.Mechanism}' is not supported by the broker. This broker supports {String.Join(", ", message.Mechanisms)}"))); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); return context; } - case ConnectionStart(var version, var serverProperties, var mechanisms, var locales) when !locales.Contains(connectionConfiguration.Locale): { + case FrameReceived(ConnectionStart(var version, var serverProperties, var mechanisms, var locales)) when !locales.Contains(connectionConfiguration.Locale): { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new Exception($"Requested locale '{connectionConfiguration.Locale}' is not supported by the broker. This broker supports {String.Join(", ", locales)}")); + replyChannel.Reply(new HandshakeFailed(new Exception($"Requested locale '{connectionConfiguration.Locale}' is not supported by the broker. This broker supports {String.Join(", ", locales)}"))); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); return context; } - case TimeoutException timeout: { - replyChannel.Reply(timeout); + case Timeout(var exception): { + replyChannel.Reply(new HandshakeFailed(exception)); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); return context; } - case ConnectionStart(var version, var serverProperties, var mechanisms, var locales): { + case FrameReceived(ConnectionStart(var version, var serverProperties, var mechanisms, var locales)): { var authenticationResponse = connectionConfiguration.AuthenticationStrategy.Respond( stage : 0, challenge: Span.Empty ); - await dispatcher.PostAsync(Dispatch.Command(new ConnectionStartOk( + await dispatcher.Dispatch(new ConnectionStartOk( PeerProperties: connectionConfiguration.PeerProperties.ToDictionary(), Mechanism : connectionConfiguration.AuthenticationStrategy.Mechanism, Response : UTF8.GetString(authenticationResponse), Locale : connectionConfiguration.Locale - ))); + )); return context with { Behaviour = AwaitingConnectionSecureOrTune(connectionConfiguration, scheduledTimeout, frameSubscription, connectionEventsSubscription, 0, serverProperties, dispatcher, replyChannel) }; @@ -87,49 +99,49 @@ await dispatcher.PostAsync(Dispatch.Command(new ConnectionStartOk( } }; - static Behaviour AwaitingConnectionSecureOrTune(ConnectionConfiguration connectionConfiguration, CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventsSubscription, Byte authenticationStage, IReadOnlyDictionary serverProperties, IAgent dispatcher, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingConnectionSecureOrTune(ConnectionConfiguration connectionConfiguration, CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventsSubscription, Byte authenticationStage, IReadOnlyDictionary serverProperties, IDispatcherAgent dispatcher, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { - case RemoteDisconnected(var fault): { - replyChannel.Reply(fault); + case ConnectionEventReceived(RemoteDisconnected(var fault)): { + replyChannel.Reply(new HandshakeFailed(fault)); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); return context; } - case TimeoutException timeout: { - replyChannel.Reply(timeout); + case Timeout(var exception): { + replyChannel.Reply(new HandshakeFailed(exception)); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); return context; } - case ConnectionSecure(var challenge): { + case FrameReceived(ConnectionSecure(var challenge)): { var challengeBytes = UTF8.GetBytes(challenge); var authenticationResponse = connectionConfiguration.AuthenticationStrategy.Respond( stage : ++authenticationStage, challenge: challengeBytes ); - await dispatcher.PostAsync(Dispatch.Command(new ConnectionSecureOk( + await dispatcher.Dispatch(new ConnectionSecureOk( Response: UTF8.GetString(authenticationResponse) - ))); + )); return context with { Behaviour = AwaitingConnectionSecureOrTune(connectionConfiguration, scheduledTimeout, frameSubscription, connectionEventsSubscription, authenticationStage, serverProperties, dispatcher, replyChannel) }; } - case ConnectionTune(var channelMax, var frameMax, var heartbeat): { + case FrameReceived(ConnectionTune(var channelMax, var frameMax, var heartbeat)): { var heartbeatFrequency = Min(heartbeat, (UInt16)connectionConfiguration.ConnectionIntegrityStrategy.HeartbeatFrequency.GetValueOrDefault().TotalSeconds); var maxFrameSize = Min(frameMax, connectionConfiguration.MaximumFrameSize); var maxChannelCount = Min(channelMax, connectionConfiguration.MaximumChannelCount); - await dispatcher.PostAsync(Dispatch.Command(new ConnectionTuneOk( + await dispatcher.Dispatch(new ConnectionTuneOk( ChannelMax: maxChannelCount, FrameMax : maxFrameSize, Heartbeat : heartbeatFrequency - ))); - await dispatcher.PostAsync(Dispatch.Command(new ConnectionOpen( + )); + await dispatcher.Dispatch(new ConnectionOpen( VirtualHost: connectionConfiguration.VirtualHost - ))); + )); return context with { Behaviour = AwaitingConnectionOpenOk(scheduledTimeout, frameSubscription, connectionEventsSubscription, maxChannelCount, maxFrameSize, heartbeatFrequency, serverProperties, replyChannel) }; @@ -138,24 +150,24 @@ await dispatcher.PostAsync(Dispatch.Command(new ConnectionOpen( } }; - static Behaviour AwaitingConnectionOpenOk(CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventSubscription, UInt16 maxChannelCount, UInt32 maxFrameSize, UInt16 heartbeatFrequency, IReadOnlyDictionary serverProperties, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingConnectionOpenOk(CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventSubscription, UInt16 maxChannelCount, UInt32 maxFrameSize, UInt16 heartbeatFrequency, IReadOnlyDictionary serverProperties, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { - case TimeoutException timeout: { - replyChannel.Reply(timeout); + case Timeout(var exception): { + replyChannel.Reply(new HandshakeFailed(exception)); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventSubscription.Dispose(); return context; } - case ConnectionOpenOk: { + case FrameReceived(ConnectionOpenOk): { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new ConnectionAgreement( + replyChannel.Reply(new ConnectionAgreed(new ConnectionAgreement( MaxChannelCount : maxChannelCount, MaxFrameSize : maxFrameSize, HeartbeatFrequency: TimeSpan.FromSeconds(heartbeatFrequency), ServerProperties : serverProperties - )); + ))); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventSubscription.Dispose(); @@ -164,4 +176,9 @@ static Behaviour AwaitingConnectionOpenOk(CancellationTokenRegistration schedule default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(AwaitingConnectionOpenOk)}' behaviour."); } }; + + async Task IHandshakeAgent.StartHandshake(ConnectionConfiguration connectionConfiguration) { + var reply = await _agent.PostAndReplyAsync(replyChannel => new StartHandshake(connectionConfiguration, replyChannel)); + return (HandshakeResult) reply; + } } diff --git a/Lapine.Core/Agents/HeartbeatAgent.cs b/Lapine.Core/Agents/HeartbeatAgent.cs index 685854e..323034b 100644 --- a/Lapine.Core/Agents/HeartbeatAgent.cs +++ b/Lapine.Core/Agents/HeartbeatAgent.cs @@ -5,40 +5,50 @@ namespace Lapine.Agents; using Lapine.Protocol; using static System.DateTime; -using static Lapine.Agents.DispatcherAgent.Protocol; -using static Lapine.Agents.HeartbeatAgent.Protocol; - -static class HeartbeatAgent { - static public class Protocol { - public record StartHeartbeat(IObservable ReceivedFrames, IAgent Dispatcher, TimeSpan Frequency); - public record RemoteFlatline; - internal record Beat; - } - static public IAgent Create() => - Agent.StartNew(Idle()); +public abstract record HeartbeatEvent; +public record RemoteFlatline : HeartbeatEvent; + +interface IHeartbeatAgent { + Task> Start(IObservable frameStream, IDispatcherAgent dispatcher, TimeSpan frequency); + Task Stop(); +} + +class HeartbeatAgent : IHeartbeatAgent { + readonly IAgent _agent; + + HeartbeatAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); + + abstract record Protocol; + record StartHeartbeat(IObservable ReceivedFrames, IDispatcherAgent Dispatcher, TimeSpan Frequency, AsyncReplyChannel ReplyChannel) : Protocol; + record Beat : Protocol; + record FrameReceived(RawFrame Frame) : Protocol; + + static public IHeartbeatAgent Create() => + new HeartbeatAgent(Agent.StartNew(Idle())); readonly record struct State( IDisposable Subscription, TimeSpan HeartbeatFrequency, - IAgent Dispatcher, - Subject HeartbeatEvents, + IDispatcherAgent Dispatcher, + Subject HeartbeatEvents, DateTime LastRemoteHeartbeat ); - static Behaviour Idle() => + static Behaviour Idle() => async context => { switch (context.Message) { - case (StartHeartbeat(var receivedFrames, var dispatcher, var frequency), AsyncReplyChannel replyChannel): { + case StartHeartbeat(var receivedFrames, var dispatcher, var frequency, var replyChannel): { if (frequency == TimeSpan.Zero) return context; var subscription = receivedFrames .Where(frame => frame.Channel == 0) .Where(frame => frame.Type == FrameType.Heartbeat) - .Subscribe(message => context.Self.PostAsync(message)); + .Subscribe(message => context.Self.PostAsync(new FrameReceived(message))); - var heartbeatEvents = new Subject(); + var heartbeatEvents = new Subject(); var cts = new CancellationTokenSource(frequency); cts.Token.Register(() => context.Self.PostAsync(new Beat())); @@ -59,20 +69,20 @@ static Behaviour Idle() => } }; - static Behaviour Beating(State state) => + static Behaviour Beating(State state) => async context => { switch (context.Message) { case Beat: { if ((UtcNow - state.LastRemoteHeartbeat) > (state.HeartbeatFrequency * 3)) { state.HeartbeatEvents.OnNext(new RemoteFlatline()); } - await state.Dispatcher.PostAsync(Dispatch.Frame(RawFrame.Heartbeat)); + await state.Dispatcher.Dispatch(RawFrame.Heartbeat); var cts = new CancellationTokenSource(state.HeartbeatFrequency); cts.Token.Register(() => context.Self.PostAsync(new Beat())); return context; } - case RawFrame { Type: FrameType.Heartbeat }: { + case FrameReceived({ Type: FrameType.Heartbeat }): { return context with { Behaviour = Beating(state with { LastRemoteHeartbeat = UtcNow @@ -82,4 +92,12 @@ static Behaviour Beating(State state) => default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Beating)}' behaviour."); } }; + + async Task> IHeartbeatAgent.Start(IObservable frameStream, IDispatcherAgent dispatcher, TimeSpan frequency) { + var reply = await _agent.PostAndReplyAsync(replyChannel => new StartHeartbeat(frameStream, dispatcher, frequency, replyChannel)); + return (IObservable) reply; + } + + async Task IHeartbeatAgent.Stop() => + await _agent.StopAsync(); } diff --git a/Lapine.Core/Agents/MessageAssemblerAgent.cs b/Lapine.Core/Agents/MessageAssemblerAgent.cs index 8f8eb23..a179b7b 100644 --- a/Lapine.Core/Agents/MessageAssemblerAgent.cs +++ b/Lapine.Core/Agents/MessageAssemblerAgent.cs @@ -1,97 +1,108 @@ -using System.Reactive; - namespace Lapine.Agents; using System.Reactive.Linq; +using System.Reactive.Subjects; using Lapine.Client; using Lapine.Protocol; using Lapine.Protocol.Commands; -using static Lapine.Agents.ConsumerAgent.Protocol; -using static Lapine.Agents.MessageAssemblerAgent.Protocol; +interface IMessageAssemblerAgent { + Task Buffer)>> Begin(IObservable frameStream, IConsumerAgent parent); + Task Stop(); +} -static class MessageAssemblerAgent { - static public class Protocol { - public record Begin(IObservable Frames, IAgent Parent); - public record Stop; - } +class MessageAssemblerAgent : IMessageAssemblerAgent { + readonly IAgent _agent; + + MessageAssemblerAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); + + abstract record Protocol; + record Begin(IObservable Frames, AsyncReplyChannel ReplyChannel) : Protocol; + record Stop : Protocol; + record FrameReceived(Object Frame) : Protocol; - static public IAgent StartNew() => - Agent.StartNew(Unstarted()); + static public IMessageAssemblerAgent StartNew() => + new MessageAssemblerAgent(Agent.StartNew(Unstarted())); - static Behaviour Unstarted() => + static Behaviour Unstarted() => async context => { switch (context.Message) { case Begin(var frames, var parent): { var frameSubscription = frames .Select(frame => RawFrame.Unwrap(frame)) - .Subscribe(message => context.Self.PostAsync(message)); + .Subscribe(message => context.Self.PostAsync(new FrameReceived(message))); - return context with {Behaviour = AwaitingBasicDeliver(parent, frameSubscription)}; + var receivedMessages = new Subject<(DeliveryInfo, BasicProperties, MemoryBufferWriter)>(); + + return context with { Behaviour = AwaitingBasicDeliver(receivedMessages, frameSubscription) }; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Unstarted)}' behaviour."); } }; - static Behaviour AwaitingBasicDeliver(IAgent listener, IDisposable frameSubscription) => + static Behaviour AwaitingBasicDeliver(Subject<(DeliveryInfo, BasicProperties, MemoryBufferWriter)> receivedMessages, IDisposable frameSubscription) => async context => { switch (context.Message) { - case BasicDeliver deliver: { - return context with { Behaviour = AwaitingContentHeader(listener, frameSubscription, DeliveryInfo.FromBasicDeliver(deliver)) }; + case FrameReceived(BasicDeliver deliver): { + return context with { Behaviour = AwaitingContentHeader(receivedMessages, frameSubscription, DeliveryInfo.FromBasicDeliver(deliver)) }; } - case Protocol.Stop: { + case Stop: { frameSubscription.Dispose(); + await context.Self.StopAsync(); return context; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(AwaitingBasicDeliver)}' behaviour."); } }; - static Behaviour AwaitingContentHeader(IAgent listener, IDisposable frameSubscription, DeliveryInfo deliveryInfo) => + static Behaviour AwaitingContentHeader(Subject<(DeliveryInfo, BasicProperties, MemoryBufferWriter)> receivedMessages, IDisposable frameSubscription, DeliveryInfo deliveryInfo) => async context => { switch (context.Message) { - case ContentHeader { BodySize: 0 } header: { - await listener.PostAsync(new ConsumeMessage( - Delivery : deliveryInfo, - Properties: header.Properties, - Buffer : new MemoryBufferWriter() - )); - return context with { Behaviour = AwaitingBasicDeliver(listener, frameSubscription) }; + case FrameReceived(ContentHeader { BodySize: 0 } header): { + receivedMessages.OnNext((deliveryInfo, header.Properties, new MemoryBufferWriter())); + return context with { Behaviour = AwaitingBasicDeliver(receivedMessages, frameSubscription) }; } - case ContentHeader header: { - return context with { Behaviour = AwaitingContentBody(listener, frameSubscription, deliveryInfo, header) }; + case FrameReceived(ContentHeader header): { + return context with { Behaviour = AwaitingContentBody(receivedMessages, frameSubscription, deliveryInfo, header) }; } - case Protocol.Stop: { + case Stop: { frameSubscription.Dispose(); + await context.Self.StopAsync(); return context; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(AwaitingContentHeader)}' behaviour."); } }; - static Behaviour AwaitingContentBody(IAgent listener, IDisposable frameSubscription, DeliveryInfo deliveryInfo, ContentHeader header) { + static Behaviour AwaitingContentBody(Subject<(DeliveryInfo, BasicProperties, MemoryBufferWriter)> receivedMessages, IDisposable frameSubscription, DeliveryInfo deliveryInfo, ContentHeader header) { var writer = new MemoryBufferWriter((Int32)header.BodySize); return async context => { switch (context.Message) { - case ReadOnlyMemory segment: { + case FrameReceived(ReadOnlyMemory segment): { writer.WriteBytes(segment.Span); if ((UInt64)writer.WrittenCount >= header.BodySize) { - await listener.PostAsync(new ConsumeMessage( - Delivery : deliveryInfo, - Properties: header.Properties, - Buffer : writer - )); - return context with { Behaviour = AwaitingBasicDeliver(listener, frameSubscription) }; + receivedMessages.OnNext((deliveryInfo, header.Properties, writer)); + return context with { Behaviour = AwaitingBasicDeliver(receivedMessages, frameSubscription) }; } return context; } - case Protocol.Stop: { + case Stop: { frameSubscription.Dispose(); + await context.Self.StopAsync(); return context; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(AwaitingContentBody)}' behaviour."); } }; } + + async Task Buffer)>> IMessageAssemblerAgent.Begin(IObservable frameStream, IConsumerAgent parent) { + var reply = await _agent.PostAndReplyAsync(replyChannel => new Begin(frameStream, replyChannel)); + return (IObservable<(DeliveryInfo DeliveryInfo, BasicProperties Properties, MemoryBufferWriter Buffer)>) reply; + } + + async Task IMessageAssemblerAgent.Stop() => + await _agent.PostAsync(new Stop()); } diff --git a/Lapine.Core/Agents/MessageContext.cs b/Lapine.Core/Agents/MessageContext.cs index 7ca481b..53084c3 100644 --- a/Lapine.Core/Agents/MessageContext.cs +++ b/Lapine.Core/Agents/MessageContext.cs @@ -1,3 +1,3 @@ namespace Lapine.Agents; -readonly record struct MessageContext(IAgent Self, Behaviour Behaviour, Object Message); +readonly record struct MessageContext(IAgent Self, Behaviour Behaviour, TProtocol Message); diff --git a/Lapine.Core/Agents/MessageHandlerAgent.cs b/Lapine.Core/Agents/MessageHandlerAgent.cs index 66938da..462394d 100644 --- a/Lapine.Core/Agents/MessageHandlerAgent.cs +++ b/Lapine.Core/Agents/MessageHandlerAgent.cs @@ -4,58 +4,64 @@ namespace Lapine.Agents; using Lapine.Protocol; using Lapine.Protocol.Commands; -using static Lapine.Agents.MessageHandlerAgent.Protocol; -using static Lapine.Agents.DispatcherAgent.Protocol; +interface IMessageHandlerAgent { + Task HandleMessage(IDispatcherAgent dispatcher, ConsumerConfiguration consumerConfiguration, DeliveryInfo deliveryInfo, BasicProperties properties, MemoryBufferWriter buffer); + Task Stop(); +} + +class MessageHandlerAgent : IMessageHandlerAgent { + readonly IAgent _agent; + + MessageHandlerAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); -static class MessageHandlerAgent { - static public class Protocol { - public record HandleMessage( - IAgent Dispatcher, - ConsumerConfiguration ConsumerConfiguration, - DeliveryInfo Delivery, - BasicProperties Properties, - MemoryBufferWriter Buffer - ); - public record HandlerReady(IAgent Handler); - } + static public IMessageHandlerAgent Create(IConsumerAgent parent) => + new MessageHandlerAgent(Agent.StartNew(Main(parent))); - static public IAgent Create(IAgent parent) => - Agent.StartNew(Main(parent)); + abstract record Protocol; + record HandleMessage( + IMessageHandlerAgent Self, + IDispatcherAgent Dispatcher, + ConsumerConfiguration ConsumerConfiguration, + DeliveryInfo Delivery, + BasicProperties Properties, + MemoryBufferWriter Buffer + ) : Protocol; - static Behaviour Main(IAgent parent) => async context => { + static Behaviour Main(IConsumerAgent parent) => async context => { switch (context.Message) { - case HandleMessage(var dispatcher, var consumerConfiguration, var deliveryInfo, var properties, var buffer): { + case HandleMessage(var self, var dispatcher, var consumerConfiguration, var deliveryInfo, var properties, var buffer): { try { await consumerConfiguration.Handler( deliveryInfo: deliveryInfo, properties : MessageProperties.FromBasicProperties(properties), body : buffer.WrittenMemory ); - await dispatcher.PostAsync(Dispatch.Command(new BasicAck( + await dispatcher.Dispatch(new BasicAck( DeliveryTag: deliveryInfo.DeliveryTag, Multiple : false - ))); + )); } catch (MessageException) { // nack without requeue... - await dispatcher.PostAsync(Dispatch.Command(new BasicReject( + await dispatcher.Dispatch(new BasicReject( DeliveryTag: deliveryInfo.DeliveryTag, ReQueue : false - ))); + )); } catch (ConsumerException) { // nack with requeue... - await dispatcher.PostAsync(Dispatch.Command(new BasicReject( + await dispatcher.Dispatch(new BasicReject( DeliveryTag: deliveryInfo.DeliveryTag, ReQueue : true - ))); + )); } finally { // Release the buffer containing the message body back into the memory pool... buffer.Dispose(); // Tell consumer agent we're ready to handle another message... - await parent.PostAsync(new HandlerReady(context.Self)); + await parent.HandlerReady(self); } return context; @@ -63,4 +69,10 @@ await dispatcher.PostAsync(Dispatch.Command(new BasicReject( default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Main)}' behaviour."); } }; + + async Task IMessageHandlerAgent.HandleMessage(IDispatcherAgent dispatcher, ConsumerConfiguration consumerConfiguration, DeliveryInfo deliveryInfo, BasicProperties properties, MemoryBufferWriter buffer) => + await _agent.PostAsync(new HandleMessage(this, dispatcher, consumerConfiguration, deliveryInfo, properties, buffer)); + + async Task IMessageHandlerAgent.Stop() => + await _agent.StopAsync(); } diff --git a/Lapine.Core/Agents/PublishAgent.cs b/Lapine.Core/Agents/PublishAgent.cs index 74c05de..9f59897 100644 --- a/Lapine.Core/Agents/PublishAgent.cs +++ b/Lapine.Core/Agents/PublishAgent.cs @@ -1,52 +1,59 @@ -using System.Reactive.Linq; - namespace Lapine.Agents; +using System.Reactive.Linq; +using System.Runtime.ExceptionServices; using Lapine.Client; using Lapine.Protocol; using Lapine.Protocol.Commands; -using static Lapine.Agents.DispatcherAgent.Protocol; -using static Lapine.Agents.PublishAgent.Protocol; +interface IPublishAgent { + Task> Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties Properties, ReadOnlyMemory Body) message); +} -static class PublishAgent { - static public class Protocol { - public record PublishMessage(String Exchange, String RoutingKey, RoutingFlags RoutingFlags, (BasicProperties Properties, ReadOnlyMemory Body) Message); - } +class PublishAgent : IPublishAgent { + readonly IAgent _agent; - static public IAgent Create(IObservable receivedFrames, IAgent dispatcher, UInt64 maxFrameSize, Boolean publisherConfirmsEnabled, UInt64 deliveryTag, CancellationToken cancellationToken) => - Agent.StartNew(Unstarted(receivedFrames, dispatcher, maxFrameSize, publisherConfirmsEnabled, deliveryTag, cancellationToken)); + PublishAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); - static Behaviour Unstarted(IObservable receivedFrames, IAgent dispatcher, UInt64 maxFrameSize, Boolean publisherConfirmsEnabled, UInt64 deliveryTag, CancellationToken cancellationToken) => + abstract record Protocol; + record PublishMessage(String Exchange, String RoutingKey, RoutingFlags RoutingFlags, (BasicProperties Properties, ReadOnlyMemory Body) Message, AsyncReplyChannel ReplyChannel) : Protocol; + record Timeout : Protocol; + record FrameReceived(ICommand Command) : Protocol; + + static public IPublishAgent Create(IObservable receivedFrames, IDispatcherAgent dispatcher, UInt64 maxFrameSize, Boolean publisherConfirmsEnabled, UInt64 deliveryTag, CancellationToken cancellationToken) => + new PublishAgent(Agent.StartNew(Unstarted(receivedFrames, dispatcher, maxFrameSize, publisherConfirmsEnabled, deliveryTag, cancellationToken))); + + static Behaviour Unstarted(IObservable receivedFrames, IDispatcherAgent dispatcher, UInt64 maxFrameSize, Boolean publisherConfirmsEnabled, UInt64 deliveryTag, CancellationToken cancellationToken) => async context => { switch (context.Message) { - case (PublishMessage(var exchange, var routingKey, var routingFlags, var message), AsyncReplyChannel replyChannel): { + case PublishMessage(var exchange, var routingKey, var routingFlags, var message, var replyChannel): { var frameSubscription = receivedFrames .Where(frame => frame.Type == FrameType.Method) - .Subscribe(frame => context.Self.PostAsync(RawFrame.UnwrapMethod(frame))); - await dispatcher.PostAsync(Dispatch.Command(new BasicPublish( + .Subscribe(frame => context.Self.PostAsync(new FrameReceived(RawFrame.UnwrapMethod(frame)))); + await dispatcher.Dispatch(new BasicPublish( ExchangeName: exchange, RoutingKey : routingKey, Mandatory : routingFlags.HasFlag(RoutingFlags.Mandatory), Immediate : routingFlags.HasFlag(RoutingFlags.Immediate) - ))); - await dispatcher.PostAsync(Dispatch.ContentHeader(new ContentHeader( + )); + await dispatcher.Dispatch(new ContentHeader( ClassId : 0x3C, BodySize : (UInt64) message.Body.Length, Properties: message.Properties - ))); + )); foreach (var segment in message.Body.Split((Int32) maxFrameSize)) { - await dispatcher.PostAsync(Dispatch.ContentBody(segment)); + await dispatcher.Dispatch(segment); } if (publisherConfirmsEnabled) { - var cancelTimeout = cancellationToken.Register(() => context.Self.PostAsync(new TimeoutException())); + var cancelTimeout = cancellationToken.Register(() => context.Self.PostAsync(new Timeout())); return context with { Behaviour = AwaitingPublisherConfirm(deliveryTag, frameSubscription, replyChannel, cancelTimeout) }; } else { - replyChannel.Reply(true); + replyChannel.Reply(new Result.Ok(true)); frameSubscription.Dispose(); await context.Self.StopAsync(); return context; @@ -57,32 +64,32 @@ await dispatcher.PostAsync(Dispatch.ContentHeader(new ContentHeader( } }; - static Behaviour AwaitingPublisherConfirm(UInt64 deliveryTag, IDisposable frameSubscription, AsyncReplyChannel replyChannel, CancellationTokenRegistration scheduledTimeout) => + static Behaviour AwaitingPublisherConfirm(UInt64 deliveryTag, IDisposable frameSubscription, AsyncReplyChannel replyChannel, CancellationTokenRegistration scheduledTimeout) => async context => { switch (context.Message) { - case BasicAck ack when ack.DeliveryTag == deliveryTag: { + case FrameReceived(BasicAck ack) when ack.DeliveryTag == deliveryTag: { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(true); + replyChannel.Reply(new Result.Ok(true)); frameSubscription.Dispose(); await context.Self.StopAsync(); return context; } - case BasicNack nack when nack.DeliveryTag == deliveryTag: { + case FrameReceived(BasicNack nack) when nack.DeliveryTag == deliveryTag: { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new AmqpException("Server rejected the message")); // Why? + replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(new AmqpException("Server rejected the message")))); // Why? frameSubscription.Dispose(); await context.Self.StopAsync(); return context; } - case TimeoutException timeout: { - replyChannel.Reply(timeout); + case Timeout: { + replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(new TimeoutException()))); frameSubscription.Dispose(); await context.Self.StopAsync(); return context; } - case ChannelClose close: { + case FrameReceived(ChannelClose close): { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(AmqpException.Create(close.ReplyCode, close.ReplyText)); + replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(AmqpException.Create(close.ReplyCode, close.ReplyText)))); frameSubscription.Dispose(); await context.Self.StopAsync(); return context; @@ -90,4 +97,9 @@ static Behaviour AwaitingPublisherConfirm(UInt64 deliveryTag, IDisposable frameS default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(AwaitingPublisherConfirm)}' behaviour."); } }; + + async Task> IPublishAgent.Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties Properties, ReadOnlyMemory Body) message) { + var reply = await _agent.PostAndReplyAsync(replyChannel => new PublishMessage(exchange, routingKey, routingFlags, message, replyChannel)); + return (Result) reply; + } } diff --git a/Lapine.Core/Agents/RequestReplyAgent.cs b/Lapine.Core/Agents/RequestReplyAgent.cs index 3a26ab8..8917399 100644 --- a/Lapine.Core/Agents/RequestReplyAgent.cs +++ b/Lapine.Core/Agents/RequestReplyAgent.cs @@ -1,58 +1,71 @@ +using System.Reactive.Linq; + namespace Lapine.Agents; +using System.Runtime.ExceptionServices; using Lapine.Client; using Lapine.Protocol; using Lapine.Protocol.Commands; -using static Lapine.Agents.DispatcherAgent.Protocol; +interface IRequestReplyAgent +where TRequest : ICommand +where TReply : ICommand { + Task> Request(TRequest request); +} + +class RequestReplyAgent : IRequestReplyAgent +where TRequest : ICommand +where TReply : ICommand { + readonly IAgent _agent; -static class RequestReplyAgent { - static public IAgent StartNew(IObservable receivedFrames, IAgent dispatcher, CancellationToken cancellationToken = default) - where TRequest : ICommand - where TReply : ICommand { - return Agent.StartNew(AwaitingRequest(receivedFrames, dispatcher, cancellationToken)); - } + RequestReplyAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); - static Behaviour AwaitingRequest(IObservable receivedFrames, IAgent dispatcher, CancellationToken cancellationToken) - where TRequest : ICommand - where TReply : ICommand => + abstract record Protocol; + record SendRequest(TRequest Request, AsyncReplyChannel ReplyChannel) : Protocol; + record OnFrameReceived(ICommand Command) : Protocol; + record OnTimeout : Protocol; + + static public IRequestReplyAgent StartNew(IObservable receivedFrames, IDispatcherAgent dispatcher, CancellationToken cancellationToken = default) => + new RequestReplyAgent(Agent.StartNew(AwaitingRequest(receivedFrames, dispatcher, cancellationToken))); + + static Behaviour AwaitingRequest(IObservable receivedFrames, IDispatcherAgent dispatcher, CancellationToken cancellationToken) => async context => { switch (context.Message) { - case (TRequest request, AsyncReplyChannel replyChannel): { + case SendRequest(var request, var replyChannel): { var framesSubscription = receivedFrames - .Subscribe(frame => context.Self.PostAsync(RawFrame.UnwrapMethod(frame))); + .Subscribe(frame => context.Self.PostAsync(new OnFrameReceived(RawFrame.UnwrapMethod(frame)))); var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.Token.Register(() => context.Self.PostAsync(new TimeoutException())); + cts.Token.Register(() => context.Self.PostAsync(new OnTimeout())); - await dispatcher.PostAsync(Dispatch.Command(request)); + await dispatcher.Dispatch(request); - return context with { Behaviour = AwaitingReply(framesSubscription, cts, replyChannel) }; + return context with { Behaviour = AwaitingReply(framesSubscription, cts, replyChannel) }; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(AwaitingRequest)}' behaviour."); } }; - static Behaviour AwaitingReply(IDisposable framesSubscription, IDisposable scheduledTimeout, AsyncReplyChannel replyChannel) - where TReply : ICommand => + static Behaviour AwaitingReply(IDisposable framesSubscription, IDisposable scheduledTimeout, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { - case TReply reply: { - replyChannel.Reply(reply); + case OnFrameReceived(TReply reply): { + replyChannel.Reply(new Result.Ok(reply)); framesSubscription.Dispose(); scheduledTimeout.Dispose(); await context.Self.StopAsync(); return context; } - case TimeoutException timeout: { - replyChannel.Reply(timeout); + case OnTimeout: { + replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(new TimeoutException()))); framesSubscription.Dispose(); scheduledTimeout.Dispose(); await context.Self.StopAsync(); return context; } - case ChannelClose(var replyCode, var replyText, _): { - replyChannel.Reply(AmqpException.Create(replyCode, replyText)); + case OnFrameReceived(ChannelClose(var replyCode, var replyText, _)): { + replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(AmqpException.Create(replyCode, replyText)))); framesSubscription.Dispose(); scheduledTimeout.Dispose(); await context.Self.StopAsync(); @@ -61,4 +74,9 @@ static Behaviour AwaitingReply(IDisposable framesSubscription, IDisposab default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(AwaitingReply)}' behaviour."); } }; + + async Task> IRequestReplyAgent.Request(TRequest request) { + var reply = (Result) await _agent.PostAndReplyAsync(replyChannel => new SendRequest(request, replyChannel)); + return reply; + } } diff --git a/Lapine.Core/Agents/SocketAgent.cs b/Lapine.Core/Agents/SocketAgent.cs index 46a13e5..665e3e8 100644 --- a/Lapine.Core/Agents/SocketAgent.cs +++ b/Lapine.Core/Agents/SocketAgent.cs @@ -8,35 +8,51 @@ namespace Lapine.Agents; using static System.Math; using static System.Net.Sockets.SocketOptionLevel; using static System.Net.Sockets.SocketOptionName; -using static Lapine.Agents.SocketAgent.Protocol; using static Lapine.Client.ConnectionConfiguration; -static class SocketAgent { - static public class Protocol { - public record Connect(IPEndPoint Endpoint, CancellationToken CancellationToken = default); - public record Connected(IObservable ConnectionEvents, IObservable ReceivedFrames); - public record ConnectionFailed(Exception Fault); - public record RemoteDisconnected(Exception Fault); - public record Tune(UInt32 MaxFrameSize); - public record EnableTcpKeepAlives(TimeSpan ProbeTime, TimeSpan RetryInterval, Int32 RetryCount); - public record Transmit(ISerializable Entity); - public record Disconnect; - - internal record Poll; - } +abstract record ConnectionEvent; +record RemoteDisconnected(Exception Fault) : ConnectionEvent; + +abstract record ConnectResult; +record Connected(IObservable ConnectionEvents, IObservable ReceivedFrames) : ConnectResult; +record ConnectionFailed(Exception Fault) : ConnectResult; + +interface ISocketAgent { + Task ConnectAsync(IPEndPoint endpoint, CancellationToken cancellationToken = default); + Task Tune(UInt32 maxFrameSize); + Task EnableTcpKeepAlives(TimeSpan probeTime, TimeSpan retryInterval, Int32 retryCount); + Task Transmit(ISerializable entity); + Task Disconnect(); + Task StopAsync(); +} + +class SocketAgent : ISocketAgent { + readonly IAgent _agent; + + SocketAgent(IAgent agent) => + _agent = agent ?? throw new ArgumentNullException(nameof(agent)); + + abstract record Protocol; + record Connect(IPEndPoint Endpoint, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record Tune(UInt32 MaxFrameSize) : Protocol; + record EnableTcpKeepAlives(TimeSpan ProbeTime, TimeSpan RetryInterval, Int32 RetryCount) : Protocol; + record Transmit(ISerializable Entity) : Protocol; + record Disconnect : Protocol; + record Poll : Protocol; + record OnAsyncResult(IAsyncResult Result) : Protocol; - static public IAgent Create() => - Agent.StartNew(Disconnected()); + static public ISocketAgent Create() => + new SocketAgent(Agent.StartNew(Disconnected())); - static Behaviour Disconnected() => + static Behaviour Disconnected() => async context => { switch (context.Message) { - case (Connect(var endpoint, var cancellationToken), AsyncReplyChannel replyChannel): { + case Connect(var endpoint, var replyChannel, var cancellationToken): { var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); try { await socket.ConnectAsync(endpoint, cancellationToken); - var events = new Subject(); + var events = new Subject(); var receivedFrames = new Subject(); replyChannel.Reply(new Connected(events, receivedFrames)); @@ -44,7 +60,7 @@ static Behaviour Disconnected() => // Begin polling... await context.Self.PostAsync(new Poll()); - return context with {Behaviour = Connected(socket, events, receivedFrames)}; + return context with { Behaviour = ConnectedBehaviour(socket, events, receivedFrames) }; } catch (OperationCanceledException) { replyChannel.Reply(new ConnectionFailed(new TimeoutException())); @@ -61,7 +77,7 @@ static Behaviour Disconnected() => } }; - static Behaviour Connected(Socket socket, Subject connectionEvents, Subject receivedFrames) { + static Behaviour ConnectedBehaviour(Socket socket, Subject connectionEvents, Subject receivedFrames) { var transmitBuffer = new MemoryBufferWriter(4096); var (frameBuffer, tail) = (new Byte[DefaultMaximumFrameSize], 0); @@ -80,12 +96,12 @@ static Behaviour Connected(Socket socket, Subject connectionEvents, Subj socketFlags: SocketFlags.None, state : socket, callback : asyncResult => { - context.Self.PostAsync(asyncResult); + context.Self.PostAsync(new OnAsyncResult(asyncResult)); } ); return ValueTask.FromResult(context); } - case IAsyncResult asyncResult: { + case OnAsyncResult(var asyncResult): { try { tail += socket.EndReceive(asyncResult); @@ -132,4 +148,24 @@ static Behaviour Connected(Socket socket, Subject connectionEvents, Subj } }; } + + async Task ISocketAgent.ConnectAsync(IPEndPoint endpoint, CancellationToken cancellationToken) { + var reply = await _agent.PostAndReplyAsync(replyChannel => new Connect(endpoint, replyChannel, cancellationToken)); + return (ConnectResult) reply; + } + + async Task ISocketAgent.Tune(UInt32 maxFrameSize) => + await _agent.PostAsync(new Tune(maxFrameSize)); + + async Task ISocketAgent.EnableTcpKeepAlives(TimeSpan probeTime, TimeSpan retryInterval, Int32 retryCount) => + await _agent.PostAsync(new EnableTcpKeepAlives(probeTime, retryInterval, retryCount)); + + async Task ISocketAgent.Transmit(ISerializable entity) => + await _agent.PostAsync(new Transmit(entity)); + + async Task ISocketAgent.Disconnect() => + await _agent.PostAsync(new Disconnect()); + + async Task ISocketAgent.StopAsync() => + await _agent.StopAsync(); } diff --git a/Lapine.Core/Client/AmqpClient.cs b/Lapine.Core/Client/AmqpClient.cs index e6c2c15..b325ed2 100644 --- a/Lapine.Core/Client/AmqpClient.cs +++ b/Lapine.Core/Client/AmqpClient.cs @@ -3,11 +3,9 @@ namespace Lapine.Client; using System.Runtime.ExceptionServices; using Lapine.Agents; -using static Lapine.Agents.AmqpClientAgent.Protocol; - public class AmqpClient : IAsyncDisposable { readonly ConnectionConfiguration _connectionConfiguration; - readonly IAgent _agent; + readonly IAmqpClientAgent _agent; public AmqpClient(ConnectionConfiguration connectionConfiguration) { _connectionConfiguration = connectionConfiguration; @@ -15,9 +13,7 @@ public AmqpClient(ConnectionConfiguration connectionConfiguration) { } public async ValueTask ConnectAsync(CancellationToken cancellationToken = default) { - var command = new EstablishConnection(_connectionConfiguration, cancellationToken); - - switch (await _agent.PostAndReplyAsync(command)) { + switch (await _agent.EstablishConnection(_connectionConfiguration, cancellationToken)) { case true: { return; } @@ -31,10 +27,8 @@ public async ValueTask ConnectAsync(CancellationToken cancellationToken = defaul } public async ValueTask OpenChannelAsync(CancellationToken cancellationToken = default) { - var command = new OpenChannel(cancellationToken); - - switch (await _agent.PostAndReplyAsync(command)) { - case IAgent channelAgent: { + switch (await _agent.OpenChannel(cancellationToken)) { + case IChannelAgent channelAgent: { return new Channel(channelAgent, _connectionConfiguration); } case Exception fault: { @@ -48,8 +42,8 @@ public async ValueTask OpenChannelAsync(CancellationToken cancellationT } public async ValueTask DisposeAsync() { - await _agent.PostAndReplyAsync(new Disconnect()); - await _agent.StopAsync(); + await _agent.Disconnect(); + await _agent.Stop(); GC.SuppressFinalize(this); } diff --git a/Lapine.Core/Client/Channel.cs b/Lapine.Core/Client/Channel.cs index 35d8d7a..445c3a9 100644 --- a/Lapine.Core/Client/Channel.cs +++ b/Lapine.Core/Client/Channel.cs @@ -4,15 +4,13 @@ namespace Lapine.Client; using Lapine.Agents; using Lapine.Protocol; -using static Lapine.Agents.ChannelAgent.Protocol; - public class Channel { - readonly IAgent _agent; + readonly IChannelAgent _agent; readonly ConnectionConfiguration _connectionConfiguration; Boolean _closed = false; - internal Channel(IAgent agent, in ConnectionConfiguration connectionConfiguration) { + internal Channel(IChannelAgent agent, in ConnectionConfiguration connectionConfiguration) { _agent = agent ?? throw new ArgumentNullException(nameof(agent)); _connectionConfiguration = connectionConfiguration; } @@ -24,7 +22,7 @@ public async ValueTask CloseAsync(CancellationToken cancellationToken = default) using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new Close(cts.Token))) { + switch (await _agent.Close(cts.Token)) { case true: { _closed = true; break; @@ -43,7 +41,7 @@ public async ValueTask DeclareExchangeAsync(ExchangeDefinition definition, Cance using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new DeclareExchange(definition, cts.Token))) { + switch (await _agent.DeclareExchange(definition, cts.Token)) { case true: { return; } @@ -61,7 +59,7 @@ public async ValueTask DeleteExchangeAsync(String exchange, DeleteExchangeCondit using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new DeleteExchange(exchange, condition, cts.Token))) { + switch (await _agent.DeleteExchange(exchange, condition, cts.Token)) { case true: { return; } @@ -79,7 +77,7 @@ public async ValueTask DeclareQueueAsync(QueueDefinition definition, Cancellatio using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new DeclareQueue(definition, cts.Token))) { + switch (await _agent.DeclareQueue(definition, cts.Token)) { case true: { return; } @@ -97,7 +95,7 @@ public async ValueTask DeleteQueueAsync(String queue, DeleteQueueCondition condi using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new DeleteQueue(queue, condition, cts.Token))) { + switch (await _agent.DeleteQueue(queue, condition, cts.Token)) { case true: { return; } @@ -115,7 +113,7 @@ public async ValueTask BindQueueAsync(Binding binding, CancellationToken cancell using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new BindQueue(binding, cts.Token))) { + switch (await _agent.BindQueue(binding, cts.Token)) { case true: { return; } @@ -133,7 +131,7 @@ public async ValueTask UnbindQueueAsync(Binding binding, CancellationToken cance using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new UnbindQueue(binding, cts.Token))) { + switch (await _agent.UnbindQueue(binding, cts.Token)) { case true: { return; } @@ -144,16 +142,16 @@ public async ValueTask UnbindQueueAsync(Binding binding, CancellationToken cance } } - public async ValueTask PurgeQueueAsync(String queue, CancellationToken cancellationToken = default) { + public async ValueTask PurgeQueueAsync(String queue, CancellationToken cancellationToken = default) { if (_closed) throw new InvalidOperationException("Channel is closed."); using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new PurgeQueue(queue, cts.Token))) { - case true: { - return; + switch (await _agent.PurgeQueue(queue, cts.Token)) { + case UInt32 messageCount: { + return messageCount; } case Exception fault: { throw fault; @@ -169,9 +167,7 @@ public async ValueTask PublishAsync(String exchange, String routingKey, (Message using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - var command = new Publish(exchange, routingKey, routingFlags, (message.Properties.ToBasicProperties(), message.Payload), cts.Token); - - switch (await _agent.PostAndReplyAsync(command)) { + switch (await _agent.Publish(exchange, routingKey, routingFlags, (message.Properties.ToBasicProperties(), message.Payload), cts.Token)) { case true: { return; } @@ -189,7 +185,7 @@ public async ValueTask PublishAsync(String exchange, String routingKey, (Message using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new GetMessage(queue, acknowledgements, cts.Token))) { + switch (await _agent.GetMessage(queue, acknowledgements, cts.Token)) { case (DeliveryInfo delivery, BasicProperties properties, ReadOnlyMemory body): { return (delivery, MessageProperties.FromBasicProperties(properties), body); } @@ -204,7 +200,7 @@ public async ValueTask AcknowledgeAsync(UInt64 deliveryTag, Boolean multiple = f if (_closed) throw new InvalidOperationException("Channel is closed."); - switch (await _agent.PostAndReplyAsync(new Acknowledge(deliveryTag, multiple))) { + switch (await _agent.Acknowledge(deliveryTag, multiple)) { case true: { return; } @@ -219,7 +215,7 @@ public async ValueTask RejectAsync(UInt64 deliveryTag, Boolean requeue) { if (_closed) throw new InvalidOperationException("Channel is closed."); - switch (await _agent.PostAndReplyAsync(new Reject(deliveryTag, requeue))) { + switch (await _agent.Reject(deliveryTag, requeue)) { case true: { return; } @@ -237,13 +233,11 @@ public async ValueTask SetPrefetchLimitAsync(UInt16 limit, PrefetchLimitScope sc using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - var command = new SetPrefetchLimit(limit, scope switch { - PrefetchLimitScope.Consumer => false, - PrefetchLimitScope.Channel => true, - _ => throw new InvalidEnumArgumentException(nameof(scope), (Int32)scope, typeof(PrefetchLimitScope)) - }, cts.Token); - - switch (await _agent.PostAndReplyAsync(command)) { + switch (await _agent.SetPrefetchLimit(limit, scope switch { + PrefetchLimitScope.Consumer => false, + PrefetchLimitScope.Channel => true, + _ => throw new InvalidEnumArgumentException(nameof(scope), (Int32)scope, typeof(PrefetchLimitScope)) + }, cts.Token)) { case true: { return; } @@ -261,7 +255,7 @@ public async ValueTask ConsumeAsync(String queue, ConsumerConfiguration using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new Consume(queue, consumerConfiguration, arguments, cts.Token))) { + switch (await _agent.Consume(queue, consumerConfiguration, arguments, cts.Token)) { case String consumerTag: { return consumerTag; } @@ -279,7 +273,7 @@ public async ValueTask EnablePublisherConfirms(CancellationToken cancellationTok using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PostAndReplyAsync(new EnablePublisherConfirms(cts.Token))) { + switch (await _agent.EnablePublisherConfirms(cts.Token)) { case true: { return; } diff --git a/Lapine.Core/EnumerableExtensions.cs b/Lapine.Core/EnumerableExtensions.cs index f9dbb44..9d6dea2 100644 --- a/Lapine.Core/EnumerableExtensions.cs +++ b/Lapine.Core/EnumerableExtensions.cs @@ -2,7 +2,7 @@ namespace Lapine; using System.Collections.Immutable; -static public class EnumerableExtensions { +static class EnumerableExtensions { static public IImmutableQueue ToImmutableQueue(this IEnumerable items) => items.Aggregate( seed: ImmutableQueue.Empty, diff --git a/Lapine.Core/MemoryBufferWriter.cs b/Lapine.Core/MemoryBufferWriter.cs index 9642fba..f72a176 100644 --- a/Lapine.Core/MemoryBufferWriter.cs +++ b/Lapine.Core/MemoryBufferWriter.cs @@ -8,7 +8,7 @@ namespace Lapine; /// Represents a heap-based, memory backed output sink into which data can be written. /// /// The type of items in this instance. -internal sealed class MemoryBufferWriter : IBufferWriter, IMemoryOwner, IDisposable { +sealed class MemoryBufferWriter : IBufferWriter, IMemoryOwner, IDisposable { const Int32 DefaultInitialCapacity = 1024; readonly MemoryPool _pool; diff --git a/Lapine.Core/Result.cs b/Lapine.Core/Result.cs new file mode 100644 index 0000000..666cbdf --- /dev/null +++ b/Lapine.Core/Result.cs @@ -0,0 +1,26 @@ +namespace Lapine; + +using System.Runtime.ExceptionServices; + +abstract record Result { + public sealed record Ok(T Value) : Result; + public sealed record Fault(ExceptionDispatchInfo ExceptionDispatchInfo) : Result; + + public Result Map(Func fn) => this switch { + Ok(var value) => new Result.Ok(fn(value)), + Fault(var dispatchInfo) => new Result.Fault(dispatchInfo) + }; + + public T ValueOrThrow() { + switch (this) { + case Ok (var value): + return value; + case Fault (var dispatchInfo): { + dispatchInfo.Throw(); + throw dispatchInfo.SourceException; + } + default: + throw new Exception("Impossible!"); + } + } +} diff --git a/Lapine.sln b/Lapine.sln old mode 100755 new mode 100644