diff --git a/Lapine.Core/Agents/Agent.cs b/Lapine.Core/Agents/Agent.cs index de086d5..be2c02d 100644 --- a/Lapine.Core/Agents/Agent.cs +++ b/Lapine.Core/Agents/Agent.cs @@ -1,16 +1,22 @@ namespace Lapine.Agents; -using System.Runtime.CompilerServices; using System.Threading.Channels; interface IAgent { ValueTask PostAsync(TProtocol message, CancellationToken cancellationToken = default); - ValueTask PostAndReplyAsync(Func messageFactory); + ValueTask PostAndReplyAsync(Func messageFactory); + ValueTask PostAndReplyAsync(Func, TProtocol> messageFactory); ValueTask StopAsync(); } -class AsyncReplyChannel(Action reply) { - public void Reply(Object response) => reply(response); +class AsyncReplyChannel(TaskCompletionSource promise) { + public void Complete() => promise.SetResult(); + public void Fault(Exception fault) => promise.SetException(fault); +} + +class AsyncReplyChannel(TaskCompletionSource promise) { + public void Reply(TReply response) => promise.SetResult(response); + public void Fault(Exception fault) => promise.SetException(fault); } class Agent : IAgent { @@ -39,11 +45,23 @@ static public IAgent StartNew(Behaviour initialBehaviour) public async ValueTask PostAsync(TProtocol message, CancellationToken cancellationToken = default) => await _mailbox.Writer.WriteAsync(message, cancellationToken); - public async ValueTask PostAndReplyAsync(Func messageFactory) { + public async ValueTask PostAndReplyAsync(Func messageFactory) { + ArgumentNullException.ThrowIfNull(messageFactory); + + var promise = new TaskCompletionSource(); + var replyChannel = new AsyncReplyChannel(promise); + var message = messageFactory(replyChannel); + + await _mailbox.Writer.WriteAsync(message); + + await promise.Task; + } + + public async ValueTask PostAndReplyAsync(Func, TProtocol> messageFactory) { ArgumentNullException.ThrowIfNull(messageFactory); - var promise = AsyncValueTaskMethodBuilder.Create(); - var replyChannel = new AsyncReplyChannel(reply => promise.SetResult(reply)); + var promise = new TaskCompletionSource(); + var replyChannel = new AsyncReplyChannel(promise); var message = messageFactory(replyChannel); await _mailbox.Writer.WriteAsync(message); diff --git a/Lapine.Core/Agents/AmqpClientAgent.Behaviours.cs b/Lapine.Core/Agents/AmqpClientAgent.Behaviours.cs index f4a2918..1ab4a6b 100644 --- a/Lapine.Core/Agents/AmqpClientAgent.Behaviours.cs +++ b/Lapine.Core/Agents/AmqpClientAgent.Behaviours.cs @@ -7,6 +7,16 @@ namespace Lapine.Agents; using Lapine.Protocol; static partial class AmqpClientAgent { + record State( + ConnectionConfiguration ConnectionConfiguration, + ISocketAgent SocketAgent, + IHeartbeatAgent HeartbeatAgent, + IObservable ReceivedFrames, + IObservable ConnectionEvents, + IDispatcherAgent Dispatcher, + IImmutableList AvailableChannelIds + ); + static Behaviour Disconnected() => async context => { switch (context.Message) { @@ -17,7 +27,7 @@ static Behaviour Disconnected() => var remainingEndpoints = new Queue(connectionConfiguration.GetConnectionSequence()); if (remainingEndpoints.Count == 0) { - replyChannel.Reply(new Exception("No endpoints specified in connection configuration")); + replyChannel.Fault(new Exception("No endpoints specified in connection configuration")); return context; } @@ -27,123 +37,110 @@ static Behaviour Disconnected() => var endpoint = remainingEndpoints.Dequeue(); var socketAgent = SocketAgent.Create(); - switch (await socketAgent.ConnectAsync(endpoint, cts.Token)) { - case ConnectionFailed(var fault) when remainingEndpoints.Any(): { - accumulatedFailures.Add(fault); - continue; - } - case ConnectionFailed(var fault): { - accumulatedFailures.Add(fault); - replyChannel.Reply(new AggregateException("Could not connect to any of the configured endpoints", accumulatedFailures)); - return context; + try { + var (connectionEvents, receivedFrames) = await socketAgent.ConnectAsync(endpoint, cts.Token); + + var dispatcher = DispatcherAgent.Create(); + await dispatcher.DispatchTo(socketAgent, 0); + + var handshakeAgent = HandshakeAgent.Create( + receivedFrames : receivedFrames, + connectionEvents : connectionEvents, + dispatcher : dispatcher, + cancellationToken: cts.Token + ); + + var connectionAgreement = await handshakeAgent.StartHandshake(connectionConfiguration); + await socketAgent.Tune(connectionAgreement.MaxFrameSize); + + var heartbeatAgent = HeartbeatAgent.Create(); + + if (connectionConfiguration.ConnectionIntegrityStrategy.HeartbeatFrequency.HasValue) { + var heartbeatEvents = await heartbeatAgent.Start(receivedFrames, dispatcher, connectionConfiguration.ConnectionIntegrityStrategy.HeartbeatFrequency.Value); + heartbeatEvents.Subscribe(onNext: message => context.Self.PostAsync(new HeartbeatEventEventReceived(message))); } - case Connected(var connectionEvents, var receivedFrames): { - var dispatcher = DispatcherAgent.Create(); - await dispatcher.DispatchTo(socketAgent, 0); - - var handshakeAgent = HandshakeAgent.Create( - receivedFrames : receivedFrames, - connectionEvents : connectionEvents, - dispatcher : dispatcher, - cancellationToken: cts.Token - ); - - switch (await handshakeAgent.StartHandshake(connectionConfiguration)) { - case ConnectionAgreed(var connectionAgreement): { - await socketAgent.Tune(connectionAgreement.MaxFrameSize); - - var heartbeatAgent = HeartbeatAgent.Create(); - - if (connectionConfiguration.ConnectionIntegrityStrategy.HeartbeatFrequency.HasValue) { - var heartbeatEvents = await heartbeatAgent.Start(receivedFrames, dispatcher, connectionConfiguration.ConnectionIntegrityStrategy.HeartbeatFrequency.Value); - heartbeatEvents.Subscribe(onNext: message => context.Self.PostAsync(new HeartbeatEventEventReceived(message))); - } - - // If tcp keepalives are enabled, configure the socket... - if (connectionConfiguration.ConnectionIntegrityStrategy.KeepAliveSettings.HasValue) { - var (probeTime, retryInterval, retryCount) = connectionConfiguration.ConnectionIntegrityStrategy.KeepAliveSettings.Value; - - await socketAgent.EnableTcpKeepAlives(probeTime, retryInterval, retryCount); - } - - replyChannel.Reply(true); - - return context with { - Behaviour = Connected( - connectionConfiguration: connectionConfiguration, - socketAgent : socketAgent, - heartbeatAgent : heartbeatAgent, - receivedFrames : receivedFrames, - connectionEvents : connectionEvents, - dispatcher : dispatcher, - availableChannelIds : Enumerable.Range(1, connectionAgreement.MaxChannelCount) - .Select(channelId => (UInt16) channelId) - .ToImmutableList() - ) - }; - } - case HandshakeFailed(var fault): { - replyChannel.Reply(fault); - return context; - } - } - - break; + + // If tcp keepalives are enabled, configure the socket... + if (connectionConfiguration.ConnectionIntegrityStrategy.KeepAliveSettings.HasValue) { + var (probeTime, retryInterval, retryCount) = connectionConfiguration.ConnectionIntegrityStrategy.KeepAliveSettings.Value; + + await socketAgent.EnableTcpKeepAlives(probeTime, retryInterval, retryCount); } + + replyChannel.Complete(); + + var state = new State( + ConnectionConfiguration: connectionConfiguration, + SocketAgent : socketAgent, + HeartbeatAgent : heartbeatAgent, + ReceivedFrames : receivedFrames, + ConnectionEvents : connectionEvents, + Dispatcher : dispatcher, + AvailableChannelIds : Enumerable.Range(1, connectionAgreement.MaxChannelCount) + .Select(channelId => (UInt16)channelId) + .ToImmutableList() + ); + + return context with { + Behaviour = Connected(state) + }; + } + catch (Exception fault) { + accumulatedFailures.Add(fault); } } + replyChannel.Fault(new AggregateException("Could not connect to any of the configured endpoints", accumulatedFailures)); return context; } case Disconnect(var replyChannel): { - replyChannel.Reply(true); + replyChannel.Complete(); return context; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Disconnected)}' behaviour."); } }; - static Behaviour Connected(ConnectionConfiguration connectionConfiguration, ISocketAgent socketAgent, IHeartbeatAgent heartbeatAgent, IObservable receivedFrames, IObservable connectionEvents, IDispatcherAgent dispatcher, IImmutableList availableChannelIds) => + static Behaviour Connected(State state) => async context => { switch (context.Message) { case HeartbeatEventEventReceived(RemoteFlatline): { - await heartbeatAgent.Stop(); - await dispatcher.Stop(); - await socketAgent.Disconnect(); - await socketAgent.StopAsync(); + await state.HeartbeatAgent.Stop(); + await state.Dispatcher.Stop(); + await state.SocketAgent.Disconnect(); + await state.SocketAgent.StopAsync(); return context with { Behaviour = Disconnected() }; } case Disconnect(var replyChannel): { - await heartbeatAgent.Stop(); - await dispatcher.Stop(); - await socketAgent.Disconnect(); - await socketAgent.StopAsync(); + await state.HeartbeatAgent.Stop(); + await state.Dispatcher.Stop(); + await state.SocketAgent.Disconnect(); + await state.SocketAgent.StopAsync(); - replyChannel.Reply(true); + replyChannel.Complete(); return context; } case OpenChannel(var replyChannel, var cancellationToken): { - var channelId = availableChannelIds[0]; - var channelAgent = ChannelAgent.Create(connectionConfiguration.MaximumFrameSize); + var channelId = state.AvailableChannelIds[0]; + var channelAgent = ChannelAgent.Create(state.ConnectionConfiguration.MaximumFrameSize); using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(connectionConfiguration.CommandTimeout); - - switch (await channelAgent.Open(channelId,receivedFrames.Where(frame => frame.Channel == channelId), connectionEvents, socketAgent, cts.Token)) { - case true: { - replyChannel.Reply(channelAgent); - return context with { - Behaviour = Connected(connectionConfiguration, socketAgent, heartbeatAgent, receivedFrames, connectionEvents, dispatcher, availableChannelIds.Remove(channelId)) - }; - } - case Exception fault: { - replyChannel.Reply(fault); - break; - } - } - - return context; + cts.CancelAfter(state.ConnectionConfiguration.CommandTimeout); + + return await channelAgent.Open(channelId, state.ReceivedFrames.Where(frame => frame.Channel == channelId), state.ConnectionEvents, state.SocketAgent, cts.Token) + .ContinueWith( + onCompleted: () => { + replyChannel.Reply(channelAgent); + return context with { + Behaviour = Connected(state with { AvailableChannelIds = state.AvailableChannelIds.Remove(channelId) }) + }; + }, + onFaulted: fault => { + replyChannel.Fault(fault); + return context; + } + ); } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Connected)}' behaviour."); } diff --git a/Lapine.Core/Agents/AmqpClientAgent.Protocol.cs b/Lapine.Core/Agents/AmqpClientAgent.Protocol.cs index d52121b..fde2dea 100644 --- a/Lapine.Core/Agents/AmqpClientAgent.Protocol.cs +++ b/Lapine.Core/Agents/AmqpClientAgent.Protocol.cs @@ -12,7 +12,7 @@ record EstablishConnection( ) : Protocol; record OpenChannel( - AsyncReplyChannel ReplyChannel, + AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default ) : Protocol; diff --git a/Lapine.Core/Agents/AmqpClientAgent.Wrapper.cs b/Lapine.Core/Agents/AmqpClientAgent.Wrapper.cs index a14b1a6..02c3ebf 100644 --- a/Lapine.Core/Agents/AmqpClientAgent.Wrapper.cs +++ b/Lapine.Core/Agents/AmqpClientAgent.Wrapper.cs @@ -4,13 +4,13 @@ namespace Lapine.Agents; static partial class AmqpClientAgent { class Wrapper(IAgent agent) : IAmqpClientAgent { - async Task IAmqpClientAgent.EstablishConnection(ConnectionConfiguration configuration, CancellationToken cancellationToken) => + async Task IAmqpClientAgent.EstablishConnection(ConnectionConfiguration configuration, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new EstablishConnection(configuration, replyChannel, cancellationToken)); - async Task IAmqpClientAgent.OpenChannel(CancellationToken cancellationToken) => - await agent.PostAndReplyAsync(replyChannel => new OpenChannel(replyChannel, cancellationToken)); + async Task IAmqpClientAgent.OpenChannel(CancellationToken cancellationToken) => + await agent.PostAndReplyAsync(replyChannel => new OpenChannel(replyChannel, cancellationToken)); - async Task IAmqpClientAgent.Disconnect() => + async Task IAmqpClientAgent.Disconnect() => await agent.PostAndReplyAsync(replyChannel => new Disconnect(replyChannel)); async Task IAmqpClientAgent.Stop() => diff --git a/Lapine.Core/Agents/AmqpClientAgent.cs b/Lapine.Core/Agents/AmqpClientAgent.cs index 539eb6a..1339b5b 100644 --- a/Lapine.Core/Agents/AmqpClientAgent.cs +++ b/Lapine.Core/Agents/AmqpClientAgent.cs @@ -3,9 +3,9 @@ namespace Lapine.Agents; using Lapine.Client; interface IAmqpClientAgent { - Task EstablishConnection(ConnectionConfiguration configuration, CancellationToken cancellationToken = default); - Task OpenChannel(CancellationToken cancellationToken = default); - Task Disconnect(); + Task EstablishConnection(ConnectionConfiguration configuration, CancellationToken cancellationToken = default); + Task OpenChannel(CancellationToken cancellationToken = default); + Task Disconnect(); Task Stop(); } diff --git a/Lapine.Core/Agents/ChannelAgent.Behaviours.cs b/Lapine.Core/Agents/ChannelAgent.Behaviours.cs index 05798b5..0470a47 100644 --- a/Lapine.Core/Agents/ChannelAgent.Behaviours.cs +++ b/Lapine.Core/Agents/ChannelAgent.Behaviours.cs @@ -12,25 +12,25 @@ static Behaviour Closed(UInt32 maxFrameSize) => case Open(var channelId, var receivedFrames, var connectionEvents, var socketAgent, var replyChannel, var cancellationToken): { var dispatcher = DispatcherAgent.Create(); var consumers = ImmutableDictionary.Empty; + await dispatcher.DispatchTo(socketAgent, channelId); var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); - switch (await processManager.Request(new ChannelOpen())) { - case Result.Ok: { - replyChannel.Reply(true); - return context with { Behaviour = OpenBehaviour(maxFrameSize, receivedFrames, dispatcher, consumers) }; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - return context; - } - } - break; + return await processManager.Request(new ChannelOpen()) + .ContinueWith( + onCompleted: _ => { + replyChannel.Complete(); + return context with { Behaviour = OpenBehaviour(maxFrameSize, receivedFrames, dispatcher, consumers) }; + }, + onFaulted: fault => { + replyChannel.Fault(fault); + return context; + } + ); } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Closed)}' behaviour."); } - return context; }; static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable receivedFrames, IDispatcherAgent dispatcher, IImmutableDictionary consumers, UInt64 deliveryTag = 1, Boolean enablePublisherConfirms = false) => @@ -39,17 +39,15 @@ static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable.StartNew(receivedFrames, dispatcher, cancellationToken); - switch (await processManager.Request(new ChannelClose(0, String.Empty, (0, 0)))) { - case Result.Ok: { - replyChannel.Reply(true); - await context.Self.StopAsync(); - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(new ChannelClose(0, String.Empty, (0, 0))) + .ContinueWith( + onCompleted: async () => { + replyChannel.Complete(); + await context.Self.StopAsync(); + }, + onFaulted: replyChannel.Fault + ); + return context; } case DeclareExchange(var exchange, var replyChannel, var cancellationToken): { @@ -66,16 +64,12 @@ static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable.Ok: { - replyChannel.Reply(true); - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(command) + .ContinueWith( + onCompleted: replyChannel.Complete, + onFaulted : replyChannel.Fault + ); + return context; } case DeleteExchange(var name, var condition, var replyChannel, var cancellationToken): { @@ -87,16 +81,12 @@ static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable.Ok: { - replyChannel.Reply(true); - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(command) + .ContinueWith( + onCompleted: replyChannel.Complete, + onFaulted : replyChannel.Fault + ); + return context; } case DeclareQueue(var queue, var replyChannel, var cancellationToken): { @@ -112,16 +102,12 @@ static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable.Ok: { - replyChannel.Reply(true); - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(command) + .ContinueWith( + onCompleted: replyChannel.Complete, + onFaulted : replyChannel.Fault + ); + return context; } case DeleteQueue(var name, var condition, var replyChannel, var cancellationToken): { @@ -134,16 +120,12 @@ static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable.Ok: { - replyChannel.Reply(true); - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(command) + .ContinueWith( + onCompleted: replyChannel.Complete, + onFaulted : replyChannel.Fault + ); + return context; } case BindQueue(var binding, var replyChannel, var cancellationToken): { @@ -157,16 +139,12 @@ static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable.Ok: { - replyChannel.Reply(true); - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(command) + .ContinueWith( + onCompleted: replyChannel.Complete, + onFaulted : replyChannel.Fault + ); + return context; } case UnbindQueue(var binding, var replyChannel, var cancellationToken): { @@ -179,31 +157,23 @@ static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable.Ok: { - replyChannel.Reply(true); - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(command) + .ContinueWith( + onCompleted: replyChannel.Complete, + onFaulted : replyChannel.Fault + ); + return context; } case PurgeQueue(var name, var replyChannel, var cancellationToken): { var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); - switch (await processManager.Request(new QueuePurge(name, NoWait: false))) { - case Result.Ok(var queuePurgeOk): { - replyChannel.Reply(queuePurgeOk.MessageCount); - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(new QueuePurge(name, NoWait: false)) + .ContinueWith( + onCompleted: queuePurgeOk => replyChannel.Reply(queuePurgeOk.MessageCount), + onFaulted : replyChannel.Fault + ); + return context; } case Publish(var exchange, var routingKey, var routingFlags, var message, var replyChannel, var cancellationToken): { @@ -216,20 +186,17 @@ static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable.Ok: { - replyChannel.Reply(true); - if (enablePublisherConfirms) { - deliveryTag += 1; - } + await publishAgent.Publish(exchange, routingKey, routingFlags, message) + .ContinueWith( + onCompleted: () => { + replyChannel.Complete(); + + if (enablePublisherConfirms) + deliveryTag += 1; + }, + onFaulted: replyChannel.Fault + ); - break; - } - case Result.Fault(var fault): { - replyChannel.Reply(fault); - break; - } - } return context; } case GetMessage(var queue, var acknowledgements, var replyChannel, var cancellationToken): { @@ -239,21 +206,23 @@ static Behaviour OpenBehaviour(UInt32 maxFrameSize, IObservable { + switch (result) { + case NoMessage: { + replyChannel.Reply(null); + break; + } + case Message(var deliveryInfo, var properties, var body): { + replyChannel.Reply((deliveryInfo, properties, body)); + break; + } + } + }, + onFaulted: replyChannel.Fault + ); + return context; } case Acknowledge(var deliveryTag, var multiple, var replyChannel): { @@ -262,7 +231,7 @@ await dispatcher.Dispatch(new BasicAck( Multiple : multiple )); - replyChannel.Reply(true); + replyChannel.Complete(); return context; } case Reject(var deliveryTag, var requeue, var replyChannel): { @@ -271,7 +240,7 @@ await dispatcher.Dispatch(new BasicReject( ReQueue : requeue )); - replyChannel.Reply(true); + replyChannel.Complete(); return context; } case SetPrefetchLimit(var limit, var global, var replyChannel, var cancellationToken): { @@ -283,50 +252,45 @@ await dispatcher.Dispatch(new BasicReject( Global : global ); - switch (await processManager.Request(command)) { - case Result.Ok: { - replyChannel.Reply(true); - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(command) + .ContinueWith( + onCompleted: replyChannel.Complete, + onFaulted : replyChannel.Fault + ); + return context; } case Consume(var queue, var configuration, var arguments, var replyChannel, var cancellationToken): { var consumerTag = $"{Guid.NewGuid()}"; var consumer = ConsumerAgent.Create(); - switch (await consumer.StartConsuming(consumerTag, receivedFrames, dispatcher, queue, configuration, arguments)) { - case true: { - replyChannel.Reply(consumerTag); - return context with { - Behaviour = OpenBehaviour(maxFrameSize, receivedFrames, dispatcher, consumers.Add(consumerTag, consumer)) - }; - } - case Exception fault: { - replyChannel.Reply(fault); - break; - } - } - return context; + return await consumer.StartConsuming(consumerTag, receivedFrames, dispatcher, queue, configuration, arguments) + .ContinueWith( + onCompleted: () => { + replyChannel.Reply(consumerTag); + + return context with { + Behaviour = OpenBehaviour(maxFrameSize, receivedFrames, dispatcher, consumers.Add(consumerTag, consumer)) + }; + }, + onFaulted: fault => { + replyChannel.Fault(fault); + return context; + } + ); } case EnablePublisherConfirms(var replyChannel, var cancellationToken): { var processManager = RequestReplyAgent.StartNew(receivedFrames, dispatcher, cancellationToken); - switch (await processManager.Request(new ConfirmSelect(NoWait: false))) { - case Result.Ok: { - replyChannel.Reply(true); - enablePublisherConfirms = true; - break; - } - case Result.Fault(var exceptionDispatchInfo): { - replyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } + await processManager.Request(new ConfirmSelect(NoWait: false)) + .ContinueWith( + onCompleted: _ => { + replyChannel.Complete(); + enablePublisherConfirms = true; + }, + onFaulted: replyChannel.Fault + ); + return context; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Open)}' behaviour."); diff --git a/Lapine.Core/Agents/ChannelAgent.Protocol.cs b/Lapine.Core/Agents/ChannelAgent.Protocol.cs index 7691180..52b1eba 100644 --- a/Lapine.Core/Agents/ChannelAgent.Protocol.cs +++ b/Lapine.Core/Agents/ChannelAgent.Protocol.cs @@ -9,7 +9,7 @@ abstract record Protocol; record Open( UInt16 ChannelId, IObservable ReceivedFrames, - IObservable ConnectionEvents, + IObservable ConnectionEvents, ISocketAgent SocketAgent, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default @@ -60,7 +60,7 @@ record UnbindQueue( record PurgeQueue( String Queue, - AsyncReplyChannel ReplyChannel, + AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default ) : Protocol; @@ -76,7 +76,7 @@ CancellationToken CancellationToken record GetMessage( String Queue, Acknowledgements Acknowledgements, - AsyncReplyChannel ReplyChannel, + AsyncReplyChannel<(DeliveryInfo delivery, BasicProperties properties, ReadOnlyMemory body)?> ReplyChannel, CancellationToken CancellationToken = default ) : Protocol; @@ -103,7 +103,7 @@ record Consume( String Queue, ConsumerConfiguration ConsumerConfiguration, IReadOnlyDictionary? Arguments, - AsyncReplyChannel ReplyChannel, + AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default ) : Protocol; diff --git a/Lapine.Core/Agents/ChannelAgent.Wrapper.cs b/Lapine.Core/Agents/ChannelAgent.Wrapper.cs index 1457c87..cc11268 100644 --- a/Lapine.Core/Agents/ChannelAgent.Wrapper.cs +++ b/Lapine.Core/Agents/ChannelAgent.Wrapper.cs @@ -5,52 +5,52 @@ namespace Lapine.Agents; static partial class ChannelAgent { class Wrapper(IAgent agent) : IChannelAgent { - async Task IChannelAgent.Open(UInt16 channelId, IObservable frameStream, IObservable connectionEvents, ISocketAgent socketAgent, CancellationToken cancellationToken) => + async Task IChannelAgent.Open(UInt16 channelId, IObservable frameStream, IObservable connectionEvents, ISocketAgent socketAgent, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new Open(channelId, frameStream, connectionEvents, socketAgent, replyChannel, cancellationToken)); - async Task IChannelAgent.Close(CancellationToken cancellationToken) => + async Task IChannelAgent.Close(CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new Close(replyChannel, cancellationToken)); - async Task IChannelAgent.DeclareExchange(ExchangeDefinition definition, CancellationToken cancellationToken) => + async Task IChannelAgent.DeclareExchange(ExchangeDefinition definition, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new DeclareExchange(definition, replyChannel, cancellationToken)); - async Task IChannelAgent.DeleteExchange(String exchange, DeleteExchangeCondition condition, CancellationToken cancellationToken) => + async Task IChannelAgent.DeleteExchange(String exchange, DeleteExchangeCondition condition, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new DeleteExchange(exchange, condition, replyChannel, cancellationToken)); - async Task IChannelAgent.DeclareQueue(QueueDefinition definition, CancellationToken cancellationToken) => + async Task IChannelAgent.DeclareQueue(QueueDefinition definition, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new DeclareQueue(definition, replyChannel, cancellationToken)); - async Task IChannelAgent.DeleteQueue(String queue, DeleteQueueCondition condition, CancellationToken cancellationToken) => + async Task IChannelAgent.DeleteQueue(String queue, DeleteQueueCondition condition, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new DeleteQueue(queue, condition, replyChannel, cancellationToken)); - async Task IChannelAgent.BindQueue(Binding binding, CancellationToken cancellationToken) => + async Task IChannelAgent.BindQueue(Binding binding, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new BindQueue(binding, replyChannel, cancellationToken)); - async Task IChannelAgent.UnbindQueue(Binding binding, CancellationToken cancellationToken) => + async Task IChannelAgent.UnbindQueue(Binding binding, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new UnbindQueue(binding, replyChannel, cancellationToken)); - async Task IChannelAgent.PurgeQueue(String queue, CancellationToken cancellationToken) => - await agent.PostAndReplyAsync(replyChannel => new PurgeQueue(queue, replyChannel, cancellationToken)); + async Task IChannelAgent.PurgeQueue(String queue, CancellationToken cancellationToken) => + await agent.PostAndReplyAsync(replyChannel => new PurgeQueue(queue, replyChannel, cancellationToken)); - async Task IChannelAgent.Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties, ReadOnlyMemory) message, CancellationToken cancellationToken) => + async Task IChannelAgent.Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties, ReadOnlyMemory) message, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new Publish(exchange, routingKey, routingFlags, message, replyChannel, cancellationToken)); - async Task IChannelAgent.GetMessage(String queue, Acknowledgements acknowledgements, CancellationToken cancellationToken) => - await agent.PostAndReplyAsync(replyChannel => new GetMessage(queue, acknowledgements, replyChannel, cancellationToken)); + async Task<(DeliveryInfo delivery, BasicProperties properties, ReadOnlyMemory body)?> IChannelAgent.GetMessage(String queue, Acknowledgements acknowledgements, CancellationToken cancellationToken) => + await agent.PostAndReplyAsync<(DeliveryInfo delivery, BasicProperties properties, ReadOnlyMemory body)?>(replyChannel => new GetMessage(queue, acknowledgements, replyChannel, cancellationToken)); - async Task IChannelAgent.Acknowledge(UInt64 deliveryTag, Boolean multiple) => + async Task IChannelAgent.Acknowledge(UInt64 deliveryTag, Boolean multiple) => await agent.PostAndReplyAsync(replyChannel => new Acknowledge(deliveryTag, multiple, replyChannel)); - async Task IChannelAgent.Reject(UInt64 deliveryTag, Boolean requeue) => + async Task IChannelAgent.Reject(UInt64 deliveryTag, Boolean requeue) => await agent.PostAndReplyAsync(replyChannel => new Reject(deliveryTag, requeue, replyChannel)); - async Task IChannelAgent.SetPrefetchLimit(UInt16 limit, Boolean global, CancellationToken cancellationToken) => + async Task IChannelAgent.SetPrefetchLimit(UInt16 limit, Boolean global, CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new SetPrefetchLimit(limit, global, replyChannel, cancellationToken)); - async Task IChannelAgent.Consume(String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments, CancellationToken cancellationToken) => - await agent.PostAndReplyAsync(replyChannel => new Consume(queue, consumerConfiguration, arguments, replyChannel, cancellationToken)); + async Task IChannelAgent.Consume(String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments, CancellationToken cancellationToken) => + await agent.PostAndReplyAsync(replyChannel => new Consume(queue, consumerConfiguration, arguments, replyChannel, cancellationToken)); - async Task IChannelAgent.EnablePublisherConfirms(CancellationToken cancellationToken) => + async Task IChannelAgent.EnablePublisherConfirms(CancellationToken cancellationToken) => await agent.PostAndReplyAsync(replyChannel => new EnablePublisherConfirms(replyChannel, cancellationToken)); } } diff --git a/Lapine.Core/Agents/ChannelAgent.cs b/Lapine.Core/Agents/ChannelAgent.cs index 47477fb..72b2683 100644 --- a/Lapine.Core/Agents/ChannelAgent.cs +++ b/Lapine.Core/Agents/ChannelAgent.cs @@ -4,22 +4,22 @@ namespace Lapine.Agents; using Lapine.Protocol; interface IChannelAgent { - Task Open(UInt16 channelId, IObservable frameStream, IObservable connectionEvents, ISocketAgent socketAgent, CancellationToken cancellationToken = default); - Task Close(CancellationToken cancellationToken = default); - Task DeclareExchange(ExchangeDefinition definition, CancellationToken cancellationToken = default); - Task DeleteExchange(String exchange, DeleteExchangeCondition condition, CancellationToken cancellationToken = default); - Task DeclareQueue(QueueDefinition definition, CancellationToken cancellationToken = default); - Task DeleteQueue(String queue, DeleteQueueCondition condition, CancellationToken cancellationToken = default); - Task BindQueue(Binding binding, CancellationToken cancellationToken = default); - Task UnbindQueue(Binding binding, CancellationToken cancellationToken = default); - Task PurgeQueue(String queue, CancellationToken cancellationToken = default); - Task Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties, ReadOnlyMemory) message, CancellationToken cancellationToken = default); - Task GetMessage(String queue, Acknowledgements acknowledgements, CancellationToken cancellationToken = default); - Task Acknowledge(UInt64 deliveryTag, Boolean multiple); - Task Reject(UInt64 deliveryTag, Boolean requeue); - Task SetPrefetchLimit(UInt16 limit, Boolean global, CancellationToken cancellationToken = default); - Task Consume(String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments, CancellationToken cancellationToken = default); - Task EnablePublisherConfirms(CancellationToken cancellationToken = default); + Task Open(UInt16 channelId, IObservable frameStream, IObservable connectionEvents, ISocketAgent socketAgent, CancellationToken cancellationToken = default); + Task Close(CancellationToken cancellationToken = default); + Task DeclareExchange(ExchangeDefinition definition, CancellationToken cancellationToken = default); + Task DeleteExchange(String exchange, DeleteExchangeCondition condition, CancellationToken cancellationToken = default); + Task DeclareQueue(QueueDefinition definition, CancellationToken cancellationToken = default); + Task DeleteQueue(String queue, DeleteQueueCondition condition, CancellationToken cancellationToken = default); + Task BindQueue(Binding binding, CancellationToken cancellationToken = default); + Task UnbindQueue(Binding binding, CancellationToken cancellationToken = default); + Task PurgeQueue(String queue, CancellationToken cancellationToken = default); + Task Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties, ReadOnlyMemory) message, CancellationToken cancellationToken = default); + Task<(DeliveryInfo delivery, BasicProperties properties, ReadOnlyMemory body)?> GetMessage(String queue, Acknowledgements acknowledgements, CancellationToken cancellationToken = default); + Task Acknowledge(UInt64 deliveryTag, Boolean multiple); + Task Reject(UInt64 deliveryTag, Boolean requeue); + Task SetPrefetchLimit(UInt16 limit, Boolean global, CancellationToken cancellationToken = default); + Task Consume(String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments, CancellationToken cancellationToken = default); + Task EnablePublisherConfirms(CancellationToken cancellationToken = default); } static partial class ChannelAgent { diff --git a/Lapine.Core/Agents/ConsumerAgent.Behaviours.cs b/Lapine.Core/Agents/ConsumerAgent.Behaviours.cs index 65a522d..967a64e 100644 --- a/Lapine.Core/Agents/ConsumerAgent.Behaviours.cs +++ b/Lapine.Core/Agents/ConsumerAgent.Behaviours.cs @@ -12,11 +12,22 @@ readonly record struct Message( MemoryBufferWriter Body ); + record State( + String ConsumerTag, + IObservable ReceivedFrames, + IDispatcherAgent Dispatcher, + IMessageAssemblerAgent Assembler, + ConsumerConfiguration ConsumerConfiguration, + IImmutableQueue AvailableHandlers, + IImmutableList BusyHandlers, + IImmutableQueue Inbox + ); + static Behaviour Unstarted() => async context => { switch (context.Message) { case StartConsuming start: { - var processManager = RequestReplyAgent.StartNew(start.ReceivedFrames, start.Dispatcher, CancellationToken.None); + var processManager = RequestReplyAgent.StartNew(start.ReceivedFrames, start.Dispatcher); var basicConsume = new BasicConsume( QueueName : start.Queue, @@ -31,127 +42,104 @@ static Behaviour Unstarted() => Arguments: start.Arguments ?? ImmutableDictionary.Empty ); - switch (await processManager.Request(basicConsume)) { - case Result.Ok(var basicConsumeOk): { - var handlers = Enumerable.Range(0, start.ConsumerConfiguration.MaxDegreeOfParallelism) - .Select(_ => MessageHandlerAgent.Create(start.Self)) - .ToImmutableQueue(); - var assembler = MessageAssemblerAgent.StartNew(); - var receivedMessages = await assembler.Begin(start.ReceivedFrames, start.Self); - receivedMessages.Subscribe(async message => await context.Self.PostAsync(new ConsumeMessage(message.DeliveryInfo, message.Properties, message.Buffer))); - - start.ReplyChannel.Reply(true); - - return context with { Behaviour = Running( - consumerTag : basicConsumeOk.ConsumerTag, - receivedFrames : start.ReceivedFrames, - dispatcher : start.Dispatcher, - assembler : assembler, - consumerConfiguration: start.ConsumerConfiguration, - availableHandlers : handlers, - busyHandlers : ImmutableList.Empty, - inbox : ImmutableQueue.Empty - ) }; - } - case Result.Fault(var exceptionDispatchInfo): { - start.ReplyChannel.Reply(exceptionDispatchInfo.SourceException); - break; - } - } - break; + return await processManager.Request(basicConsume) + .ContinueWith( + onCompleted: async basicConsumeOk => { + var handlers = Enumerable.Range(0, start.ConsumerConfiguration.MaxDegreeOfParallelism) + .Select(_ => MessageHandlerAgent.Create(start.Self)) + .ToImmutableQueue(); + var assembler = MessageAssemblerAgent.StartNew(); + var receivedMessages = await assembler.Begin(start.ReceivedFrames, start.Self); + receivedMessages.Subscribe(async message => await context.Self.PostAsync(new ConsumeMessage(message.DeliveryInfo, message.Properties, message.Buffer))); + + start.ReplyChannel.Complete(); + + var state = new State( + ConsumerTag : basicConsumeOk.ConsumerTag, + ReceivedFrames : start.ReceivedFrames, + Dispatcher : start.Dispatcher, + Assembler : assembler, + ConsumerConfiguration: start.ConsumerConfiguration, + AvailableHandlers : handlers, + BusyHandlers : ImmutableList.Empty, + Inbox : ImmutableQueue.Empty + ); + + return context with { Behaviour = Running(state) }; + }, + onFaulted: fault => { + start.ReplyChannel.Fault(fault); + return context; + } + ); } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Unstarted)}' behaviour."); } - return context; }; - static Behaviour Running(String consumerTag, IObservable receivedFrames, IDispatcherAgent dispatcher, IMessageAssemblerAgent assembler, ConsumerConfiguration consumerConfiguration, IImmutableQueue availableHandlers, IImmutableList busyHandlers, IImmutableQueue inbox) => + static Behaviour Running(State state) => async context => { switch (context.Message) { - case ConsumeMessage(var deliveryInfo, var properties, var buffer) when availableHandlers.Any(): { - availableHandlers = availableHandlers.Dequeue(out var handler); - await handler.HandleMessage(dispatcher, consumerConfiguration, deliveryInfo, properties, buffer); - - return context with { Behaviour = Running( - consumerTag : consumerTag, - receivedFrames : receivedFrames, - dispatcher : dispatcher, - assembler : assembler, - consumerConfiguration: consumerConfiguration, - availableHandlers : availableHandlers, - busyHandlers : busyHandlers.Add(handler), - inbox : inbox - ) }; + case ConsumeMessage(var deliveryInfo, var properties, var buffer) when state.AvailableHandlers.Any(): { + var availableHandlers = state.AvailableHandlers.Dequeue(out var handler); + await handler.HandleMessage(state.Dispatcher, state.ConsumerConfiguration, deliveryInfo, properties, buffer); + + return context with { + Behaviour = Running(state with { + AvailableHandlers = availableHandlers, + BusyHandlers = state.BusyHandlers.Add(handler) + }) + }; } - case ConsumeMessage(var deliveryInfo, var properties, var buffer) when availableHandlers.IsEmpty: { - return context with { Behaviour = Running( - consumerTag : consumerTag, - receivedFrames : receivedFrames, - dispatcher : dispatcher, - assembler : assembler, - consumerConfiguration: consumerConfiguration, - availableHandlers : availableHandlers, - busyHandlers : busyHandlers, - inbox : inbox.Enqueue(new Message( - DeliveryInfo: deliveryInfo, - Properties : properties, - Body : buffer - )) - ) }; + case ConsumeMessage(var deliveryInfo, var properties, var buffer) when state.AvailableHandlers.IsEmpty: { + return context with { + Behaviour = Running(state with { + Inbox = state.Inbox.Enqueue(new Message(deliveryInfo, properties, buffer)) + }) + }; } - case HandlerReady(var handler) when inbox.IsEmpty: { - return context with { Behaviour = Running( - consumerTag : consumerTag, - receivedFrames : receivedFrames, - dispatcher : dispatcher, - assembler : assembler, - consumerConfiguration: consumerConfiguration, - availableHandlers : availableHandlers.Enqueue(handler), - busyHandlers : busyHandlers.Remove(handler), - inbox : inbox - ) }; + case HandlerReady(var handler) when state.Inbox.IsEmpty: { + return context with { + Behaviour = Running(state with { + AvailableHandlers = state.AvailableHandlers.Enqueue(handler), + BusyHandlers = state.BusyHandlers.Remove(handler) + }) + }; } - case HandlerReady(var handler) when inbox.Any(): { - inbox = inbox.Dequeue(out var message); - - await handler.HandleMessage(dispatcher, consumerConfiguration, message.DeliveryInfo, message.Properties, message.Body); - - return context with { Behaviour = Running( - consumerTag : consumerTag, - receivedFrames : receivedFrames, - dispatcher : dispatcher, - assembler : assembler, - consumerConfiguration: consumerConfiguration, - availableHandlers : availableHandlers, - busyHandlers : busyHandlers, - inbox : inbox - ) }; + case HandlerReady(var handler) when state.Inbox.Any(): { + var inbox = state.Inbox.Dequeue(out var message); + + await handler.HandleMessage(state.Dispatcher, state.ConsumerConfiguration, message.DeliveryInfo, message.Properties, message.Body); + + return context with { + Behaviour = Running(state with { Inbox = inbox }) + }; } case Stop: { var processManager = RequestReplyAgent.StartNew( - receivedFrames : receivedFrames, - dispatcher : dispatcher, + receivedFrames : state.ReceivedFrames, + dispatcher : state.Dispatcher, cancellationToken: CancellationToken.None ); - switch (await processManager.Request(new BasicCancel(consumerTag, false))) { - case Result.Ok(var basicCancelOk): { - foreach (var handlerAgent in availableHandlers) - await handlerAgent.Stop(); + await processManager.Request(new BasicCancel(state.ConsumerTag, NoWait: false)) + .ContinueWith( + onCompleted: async _ => { + foreach (var handlerAgent in state.AvailableHandlers) + await handlerAgent.Stop(); + + foreach (var handlerAgent in state.BusyHandlers) + await handlerAgent.Stop(); - foreach (var handlerAgent in busyHandlers) - await handlerAgent.Stop(); + await state.Assembler.Stop(); - await assembler.Stop(); + await context.Self.StopAsync(); + }, + onFaulted: fault => { + // TODO + } + ); - await context.Self.StopAsync(); - return context; - } - case Result.Fault(var exceptionDispatchInfo): { - // TODO - break; - } - } return context; } default: throw new Exception($"Unexpected message '{context.Message.GetType().FullName}' in '{nameof(Running)}' behaviour."); diff --git a/Lapine.Core/Agents/ConsumerAgent.Wrapper.cs b/Lapine.Core/Agents/ConsumerAgent.Wrapper.cs index d91cade..cced2bd 100644 --- a/Lapine.Core/Agents/ConsumerAgent.Wrapper.cs +++ b/Lapine.Core/Agents/ConsumerAgent.Wrapper.cs @@ -5,7 +5,7 @@ namespace Lapine.Agents; static partial class ConsumerAgent { class Wrapper(IAgent agent) : IConsumerAgent { - async Task IConsumerAgent.StartConsuming(String consumerTag, IObservable frameStream, IDispatcherAgent dispatcherAgent, String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments) => + async Task IConsumerAgent.StartConsuming(String consumerTag, IObservable frameStream, IDispatcherAgent dispatcherAgent, String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments) => await agent.PostAndReplyAsync(replyChannel => new StartConsuming(this, consumerTag, frameStream, dispatcherAgent, queue, consumerConfiguration, arguments, replyChannel)); async Task IConsumerAgent.HandlerReady(IMessageHandlerAgent handler) => diff --git a/Lapine.Core/Agents/ConsumerAgent.cs b/Lapine.Core/Agents/ConsumerAgent.cs index 432a484..45a0b7f 100644 --- a/Lapine.Core/Agents/ConsumerAgent.cs +++ b/Lapine.Core/Agents/ConsumerAgent.cs @@ -4,7 +4,7 @@ namespace Lapine.Agents; using Lapine.Protocol; interface IConsumerAgent { - Task StartConsuming(String consumerTag, IObservable frameStream, IDispatcherAgent dispatcherAgent, String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments = null); + Task StartConsuming(String consumerTag, IObservable frameStream, IDispatcherAgent dispatcherAgent, String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments = null); Task HandlerReady(IMessageHandlerAgent handler); } diff --git a/Lapine.Core/Agents/GetMessageAgent.Behaviours.cs b/Lapine.Core/Agents/GetMessageAgent.Behaviours.cs index d363a44..19dc2b9 100644 --- a/Lapine.Core/Agents/GetMessageAgent.Behaviours.cs +++ b/Lapine.Core/Agents/GetMessageAgent.Behaviours.cs @@ -32,7 +32,7 @@ await dispatcher.Dispatch(new BasicGet( } }; - static Behaviour AwaitingBasicGetOkOrEmpty(IDisposable subscription, CancellationTokenRegistration cancelTimeout, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingBasicGetOkOrEmpty(IDisposable subscription, CancellationTokenRegistration cancelTimeout, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { case FrameReceived(BasicGetEmpty): { @@ -49,14 +49,14 @@ static Behaviour AwaitingBasicGetOkOrEmpty(IDisposable subscription, C }; } case Timeout: { - replyChannel.Reply(new TimeoutException()); + replyChannel.Fault(new TimeoutException()); await context.Self.StopAsync(); subscription.Dispose(); return context; } case FrameReceived(ChannelClose close): { await cancelTimeout.DisposeAsync(); - replyChannel.Reply(AmqpException.Create(close.ReplyCode, close.ReplyText)); + replyChannel.Fault(AmqpException.Create(close.ReplyCode, close.ReplyText)); await context.Self.StopAsync(); subscription.Dispose(); return context; @@ -65,7 +65,7 @@ static Behaviour AwaitingBasicGetOkOrEmpty(IDisposable subscription, C } }; - static Behaviour AwaitingContentHeader(IDisposable subscription, CancellationTokenRegistration cancelTimeout, DeliveryInfo deliveryInfo, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingContentHeader(IDisposable subscription, CancellationTokenRegistration cancelTimeout, DeliveryInfo deliveryInfo, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { case FrameReceived(ContentHeader { BodySize: 0 } header): { @@ -81,14 +81,14 @@ static Behaviour AwaitingContentHeader(IDisposable subscription, Cance }; } case FrameReceived(TimeoutException timeout): { - replyChannel.Reply(timeout); + replyChannel.Fault(timeout); await context.Self.StopAsync(); subscription.Dispose(); return context; } case FrameReceived(ChannelClose close): { await cancelTimeout.DisposeAsync(); - replyChannel.Reply(AmqpException.Create(close.ReplyCode, close.ReplyText)); + replyChannel.Fault(AmqpException.Create(close.ReplyCode, close.ReplyText)); await context.Self.StopAsync(); subscription.Dispose(); return context; @@ -97,7 +97,7 @@ static Behaviour AwaitingContentHeader(IDisposable subscription, Cance } }; - static Behaviour AwaitingContentBody(IDisposable subscription, CancellationTokenRegistration cancelTimeout, DeliveryInfo deliveryInfo, ContentHeader header, AsyncReplyChannel replyChannel) { + static Behaviour AwaitingContentBody(IDisposable subscription, CancellationTokenRegistration cancelTimeout, DeliveryInfo deliveryInfo, ContentHeader header, AsyncReplyChannel replyChannel) { var buffer = new MemoryBufferWriter((Int32)header.BodySize); return async context => { @@ -113,7 +113,7 @@ static Behaviour AwaitingContentBody(IDisposable subscription, Cancell return context; } case Timeout: { - replyChannel.Reply(new TimeoutException()); + replyChannel.Fault(new TimeoutException()); await context.Self.StopAsync(); subscription.Dispose(); return context; diff --git a/Lapine.Core/Agents/GetMessageAgent.Protocol.cs b/Lapine.Core/Agents/GetMessageAgent.Protocol.cs index c925855..723545d 100644 --- a/Lapine.Core/Agents/GetMessageAgent.Protocol.cs +++ b/Lapine.Core/Agents/GetMessageAgent.Protocol.cs @@ -6,7 +6,7 @@ namespace Lapine.Agents; static partial class GetMessageAgent { abstract record Protocol; - record GetMessage(String Queue, Acknowledgements Acknowledgements, AsyncReplyChannel ReplyChannel) : Protocol; + record GetMessage(String Queue, Acknowledgements Acknowledgements, AsyncReplyChannel ReplyChannel) : Protocol; record FrameReceived(Object Frame) : Protocol; record Timeout : Protocol; } diff --git a/Lapine.Core/Agents/GetMessageAgent.Wrapper.cs b/Lapine.Core/Agents/GetMessageAgent.Wrapper.cs index 5c287ac..87f995c 100644 --- a/Lapine.Core/Agents/GetMessageAgent.Wrapper.cs +++ b/Lapine.Core/Agents/GetMessageAgent.Wrapper.cs @@ -5,14 +5,7 @@ namespace Lapine.Agents; static partial class GetMessageAgent { class Wrapper(IAgent agent) : IGetMessageAgent { async Task IGetMessageAgent.GetMessages(String queue, Acknowledgements acknowledgements) { - switch (await agent.PostAndReplyAsync(replyChannel => new GetMessage(queue, acknowledgements, replyChannel))) { - case GetMessageResult reply: - return reply; - case Exception fault: - throw fault; - default: - throw new Exception("Unexpected return value"); - } + return await agent.PostAndReplyAsync(replyChannel => new GetMessage(queue, acknowledgements, replyChannel)); } } } diff --git a/Lapine.Core/Agents/HandshakeAgent.Behaviours.cs b/Lapine.Core/Agents/HandshakeAgent.Behaviours.cs index 65730b8..4f47ae2 100644 --- a/Lapine.Core/Agents/HandshakeAgent.Behaviours.cs +++ b/Lapine.Core/Agents/HandshakeAgent.Behaviours.cs @@ -32,12 +32,12 @@ static Behaviour Unstarted(IObservable receivedFrames, IObse } }; - static Behaviour AwaitingConnectionStart(ConnectionConfiguration connectionConfiguration, CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventsSubscription, IDispatcherAgent dispatcher, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingConnectionStart(ConnectionConfiguration connectionConfiguration, CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventsSubscription, IDispatcherAgent dispatcher, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { case FrameReceived(ConnectionStart message) when !message.Mechanisms.Contains(connectionConfiguration.AuthenticationStrategy.Mechanism): { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new HandshakeFailed(new Exception($"Requested authentication mechanism '{connectionConfiguration.AuthenticationStrategy.Mechanism}' is not supported by the broker. This broker supports {String.Join(", ", message.Mechanisms)}"))); + replyChannel.Fault(new Exception($"Requested authentication mechanism '{connectionConfiguration.AuthenticationStrategy.Mechanism}' is not supported by the broker. This broker supports {String.Join(", ", message.Mechanisms)}")); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); @@ -45,14 +45,14 @@ static Behaviour AwaitingConnectionStart(ConnectionConfiguration conne } case FrameReceived(ConnectionStart(var version, var serverProperties, var mechanisms, var locales)) when !locales.Contains(connectionConfiguration.Locale): { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new HandshakeFailed(new Exception($"Requested locale '{connectionConfiguration.Locale}' is not supported by the broker. This broker supports {String.Join(", ", locales)}"))); + replyChannel.Fault(new Exception($"Requested locale '{connectionConfiguration.Locale}' is not supported by the broker. This broker supports {String.Join(", ", locales)}")); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); return context; } case Timeout(var exception): { - replyChannel.Reply(new HandshakeFailed(exception)); + replyChannel.Fault(exception); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); @@ -77,18 +77,18 @@ await dispatcher.Dispatch(new ConnectionStartOk( } }; - static Behaviour AwaitingConnectionSecureOrTune(ConnectionConfiguration connectionConfiguration, CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventsSubscription, Byte authenticationStage, IReadOnlyDictionary serverProperties, IDispatcherAgent dispatcher, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingConnectionSecureOrTune(ConnectionConfiguration connectionConfiguration, CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventsSubscription, Byte authenticationStage, IReadOnlyDictionary serverProperties, IDispatcherAgent dispatcher, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { case ConnectionEventReceived(RemoteDisconnected(var fault)): { - replyChannel.Reply(new HandshakeFailed(fault)); + replyChannel.Fault(fault); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); return context; } case Timeout(var exception): { - replyChannel.Reply(new HandshakeFailed(exception)); + replyChannel.Fault(exception); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventsSubscription.Dispose(); @@ -128,11 +128,11 @@ await dispatcher.Dispatch(new ConnectionOpen( } }; - static Behaviour AwaitingConnectionOpenOk(CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventSubscription, UInt16 maxChannelCount, UInt32 maxFrameSize, UInt16 heartbeatFrequency, IReadOnlyDictionary serverProperties, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingConnectionOpenOk(CancellationTokenRegistration scheduledTimeout, IDisposable frameSubscription, IDisposable connectionEventSubscription, UInt16 maxChannelCount, UInt32 maxFrameSize, UInt16 heartbeatFrequency, IReadOnlyDictionary serverProperties, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { case Timeout(var exception): { - replyChannel.Reply(new HandshakeFailed(exception)); + replyChannel.Fault(exception); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventSubscription.Dispose(); @@ -140,12 +140,12 @@ static Behaviour AwaitingConnectionOpenOk(CancellationTokenRegistratio } case FrameReceived(ConnectionOpenOk): { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new ConnectionAgreed(new ConnectionAgreement( + replyChannel.Reply(new ConnectionAgreement( MaxChannelCount : maxChannelCount, MaxFrameSize : maxFrameSize, HeartbeatFrequency: TimeSpan.FromSeconds(heartbeatFrequency), ServerProperties : serverProperties - ))); + )); await context.Self.StopAsync(); frameSubscription.Dispose(); connectionEventSubscription.Dispose(); diff --git a/Lapine.Core/Agents/HandshakeAgent.Protocol.cs b/Lapine.Core/Agents/HandshakeAgent.Protocol.cs index 059ddf5..8e749ab 100644 --- a/Lapine.Core/Agents/HandshakeAgent.Protocol.cs +++ b/Lapine.Core/Agents/HandshakeAgent.Protocol.cs @@ -1,11 +1,12 @@ namespace Lapine.Agents; using Lapine.Client; +using Lapine.Protocol; using Lapine.Protocol.Commands; static partial class HandshakeAgent { abstract record Protocol; - record StartHandshake(ConnectionConfiguration ConnectionConfiguration, AsyncReplyChannel ReplyChannel) : Protocol; + record StartHandshake(ConnectionConfiguration ConnectionConfiguration, AsyncReplyChannel ReplyChannel) : Protocol; record FrameReceived(ICommand Frame) : Protocol; record Timeout(TimeoutException Exception) : Protocol; record ConnectionEventReceived(Object Message) : Protocol; diff --git a/Lapine.Core/Agents/HandshakeAgent.Wrapper.cs b/Lapine.Core/Agents/HandshakeAgent.Wrapper.cs index aa9d4b5..9d3d077 100644 --- a/Lapine.Core/Agents/HandshakeAgent.Wrapper.cs +++ b/Lapine.Core/Agents/HandshakeAgent.Wrapper.cs @@ -1,12 +1,12 @@ namespace Lapine.Agents; using Lapine.Client; +using Lapine.Protocol; static partial class HandshakeAgent { class Wrapper(IAgent agent) : IHandshakeAgent { - async Task IHandshakeAgent.StartHandshake(ConnectionConfiguration connectionConfiguration) { - var reply = await agent.PostAndReplyAsync(replyChannel => new StartHandshake(connectionConfiguration, replyChannel)); - return (HandshakeResult) reply; + async Task IHandshakeAgent.StartHandshake(ConnectionConfiguration connectionConfiguration) { + return await agent.PostAndReplyAsync(replyChannel => new StartHandshake(connectionConfiguration, replyChannel)); } } } diff --git a/Lapine.Core/Agents/HandshakeAgent.cs b/Lapine.Core/Agents/HandshakeAgent.cs index 64e8f5f..0c79179 100644 --- a/Lapine.Core/Agents/HandshakeAgent.cs +++ b/Lapine.Core/Agents/HandshakeAgent.cs @@ -3,12 +3,8 @@ namespace Lapine.Agents; using Lapine.Client; using Lapine.Protocol; -abstract record HandshakeResult; -record ConnectionAgreed(ConnectionAgreement Agreement) : HandshakeResult; -record HandshakeFailed(Exception Fault) : HandshakeResult; - interface IHandshakeAgent { - Task StartHandshake(ConnectionConfiguration connectionConfiguration); + Task StartHandshake(ConnectionConfiguration connectionConfiguration); } static partial class HandshakeAgent { diff --git a/Lapine.Core/Agents/HeartbeatAgent.Protocol.cs b/Lapine.Core/Agents/HeartbeatAgent.Protocol.cs index 84a518c..6646e4c 100644 --- a/Lapine.Core/Agents/HeartbeatAgent.Protocol.cs +++ b/Lapine.Core/Agents/HeartbeatAgent.Protocol.cs @@ -9,7 +9,7 @@ record StartHeartbeat( IObservable ReceivedFrames, IDispatcherAgent Dispatcher, TimeSpan Frequency, - AsyncReplyChannel ReplyChannel + AsyncReplyChannel> ReplyChannel ) : Protocol; record Beat : Protocol; diff --git a/Lapine.Core/Agents/HeartbeatAgent.Wrapper.cs b/Lapine.Core/Agents/HeartbeatAgent.Wrapper.cs index 6e8fe3c..3660c3e 100644 --- a/Lapine.Core/Agents/HeartbeatAgent.Wrapper.cs +++ b/Lapine.Core/Agents/HeartbeatAgent.Wrapper.cs @@ -5,8 +5,7 @@ namespace Lapine.Agents; static partial class HeartbeatAgent { class Wrapper(IAgent agent) : IHeartbeatAgent { async Task> IHeartbeatAgent.Start(IObservable frameStream, IDispatcherAgent dispatcher, TimeSpan frequency) { - var reply = await agent.PostAndReplyAsync(replyChannel => new StartHeartbeat(frameStream, dispatcher, frequency, replyChannel)); - return (IObservable) reply; + return await agent.PostAndReplyAsync>(replyChannel => new StartHeartbeat(frameStream, dispatcher, frequency, replyChannel)); } async Task IHeartbeatAgent.Stop() => diff --git a/Lapine.Core/Agents/MessageAssemblerAgent.Protocol.cs b/Lapine.Core/Agents/MessageAssemblerAgent.Protocol.cs index 812e329..9ca4a11 100644 --- a/Lapine.Core/Agents/MessageAssemblerAgent.Protocol.cs +++ b/Lapine.Core/Agents/MessageAssemblerAgent.Protocol.cs @@ -1,10 +1,11 @@ namespace Lapine.Agents; +using Lapine.Client; using Lapine.Protocol; static partial class MessageAssemblerAgent { abstract record Protocol; - record Begin(IObservable Frames, AsyncReplyChannel ReplyChannel) : Protocol; + record Begin(IObservable Frames, AsyncReplyChannel Buffer)>> ReplyChannel) : Protocol; record Stop : Protocol; record FrameReceived(Object Frame) : Protocol; } diff --git a/Lapine.Core/Agents/MessageAssemblerAgent.Wrapper.cs b/Lapine.Core/Agents/MessageAssemblerAgent.Wrapper.cs index 060226a..401de56 100644 --- a/Lapine.Core/Agents/MessageAssemblerAgent.Wrapper.cs +++ b/Lapine.Core/Agents/MessageAssemblerAgent.Wrapper.cs @@ -6,8 +6,7 @@ namespace Lapine.Agents; static partial class MessageAssemblerAgent { class Wrapper(IAgent agent) : IMessageAssemblerAgent { async Task Buffer)>> IMessageAssemblerAgent.Begin(IObservable frameStream, IConsumerAgent parent) { - var reply = await agent.PostAndReplyAsync(replyChannel => new Begin(frameStream, replyChannel)); - return (IObservable<(DeliveryInfo DeliveryInfo, BasicProperties Properties, MemoryBufferWriter Buffer)>) reply; + return await agent.PostAndReplyAsync Buffer)>>(replyChannel => new Begin(frameStream, replyChannel)); } async Task IMessageAssemblerAgent.Stop() => diff --git a/Lapine.Core/Agents/PublishAgent.Behaviours.cs b/Lapine.Core/Agents/PublishAgent.Behaviours.cs index 3776ba4..183a058 100644 --- a/Lapine.Core/Agents/PublishAgent.Behaviours.cs +++ b/Lapine.Core/Agents/PublishAgent.Behaviours.cs @@ -1,7 +1,6 @@ namespace Lapine.Agents; using System.Reactive.Linq; -using System.Runtime.ExceptionServices; using Lapine.Client; using Lapine.Protocol; using Lapine.Protocol.Commands; @@ -37,7 +36,7 @@ await dispatcher.Dispatch(new ContentHeader( }; } else { - replyChannel.Reply(new Result.Ok(true)); + replyChannel.Complete(); await context.Self.StopAsync(); return context; } @@ -52,27 +51,27 @@ static Behaviour AwaitingPublisherConfirm(UInt64 deliveryTag, IDisposa switch (context.Message) { case FrameReceived(BasicAck ack) when ack.DeliveryTag == deliveryTag: { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new Result.Ok(true)); + replyChannel.Complete(); frameSubscription.Dispose(); await context.Self.StopAsync(); return context; } case FrameReceived(BasicNack nack) when nack.DeliveryTag == deliveryTag: { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(new AmqpException("Server rejected the message")))); // Why? + replyChannel.Fault(new AmqpException("Server rejected the message")); // Why? frameSubscription.Dispose(); await context.Self.StopAsync(); return context; } case Timeout: { - replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(new TimeoutException()))); + replyChannel.Fault(new TimeoutException()); frameSubscription.Dispose(); await context.Self.StopAsync(); return context; } case FrameReceived(ChannelClose close): { await scheduledTimeout.DisposeAsync(); - replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(AmqpException.Create(close.ReplyCode, close.ReplyText)))); + replyChannel.Fault(AmqpException.Create(close.ReplyCode, close.ReplyText)); frameSubscription.Dispose(); await context.Self.StopAsync(); return context; diff --git a/Lapine.Core/Agents/PublishAgent.Wrapper.cs b/Lapine.Core/Agents/PublishAgent.Wrapper.cs index fac9823..24c78ad 100644 --- a/Lapine.Core/Agents/PublishAgent.Wrapper.cs +++ b/Lapine.Core/Agents/PublishAgent.Wrapper.cs @@ -5,9 +5,8 @@ namespace Lapine.Agents; static partial class PublishAgent { class Wrapper(IAgent agent) : IPublishAgent { - async Task> IPublishAgent.Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties Properties, ReadOnlyMemory Body) message) { - var reply = await agent.PostAndReplyAsync(replyChannel => new PublishMessage(exchange, routingKey, routingFlags, message, replyChannel)); - return (Result) reply; + async Task IPublishAgent.Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties Properties, ReadOnlyMemory Body) message) { + await agent.PostAndReplyAsync(replyChannel => new PublishMessage(exchange, routingKey, routingFlags, message, replyChannel)); } } } diff --git a/Lapine.Core/Agents/PublishAgent.cs b/Lapine.Core/Agents/PublishAgent.cs index 1c3ce36..1c31603 100644 --- a/Lapine.Core/Agents/PublishAgent.cs +++ b/Lapine.Core/Agents/PublishAgent.cs @@ -4,7 +4,7 @@ namespace Lapine.Agents; using Lapine.Protocol; interface IPublishAgent { - Task> Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties Properties, ReadOnlyMemory Body) message); + Task Publish(String exchange, String routingKey, RoutingFlags routingFlags, (BasicProperties Properties, ReadOnlyMemory Body) message); } static partial class PublishAgent { diff --git a/Lapine.Core/Agents/RequestReplyAgent.Behaviours.cs b/Lapine.Core/Agents/RequestReplyAgent.Behaviours.cs index bc69093..afb3577 100644 --- a/Lapine.Core/Agents/RequestReplyAgent.Behaviours.cs +++ b/Lapine.Core/Agents/RequestReplyAgent.Behaviours.cs @@ -1,6 +1,5 @@ namespace Lapine.Agents; -using System.Runtime.ExceptionServices; using Lapine.Client; using Lapine.Protocol; using Lapine.Protocol.Commands; @@ -24,25 +23,25 @@ static Behaviour AwaitingRequest(IObservable receivedFrames, } }; - static Behaviour AwaitingReply(IDisposable framesSubscription, IDisposable scheduledTimeout, AsyncReplyChannel replyChannel) => + static Behaviour AwaitingReply(IDisposable framesSubscription, IDisposable scheduledTimeout, AsyncReplyChannel replyChannel) => async context => { switch (context.Message) { case OnFrameReceived(TReply reply): { - replyChannel.Reply(new Result.Ok(reply)); + replyChannel.Reply(reply); framesSubscription.Dispose(); scheduledTimeout.Dispose(); await context.Self.StopAsync(); return context; } case OnTimeout: { - replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(new TimeoutException()))); + replyChannel.Fault(new TimeoutException()); framesSubscription.Dispose(); scheduledTimeout.Dispose(); await context.Self.StopAsync(); return context; } case OnFrameReceived(ChannelClose(var replyCode, var replyText, _)): { - replyChannel.Reply(new Result.Fault(ExceptionDispatchInfo.Capture(AmqpException.Create(replyCode, replyText)))); + replyChannel.Fault(AmqpException.Create(replyCode, replyText)); framesSubscription.Dispose(); scheduledTimeout.Dispose(); await context.Self.StopAsync(); diff --git a/Lapine.Core/Agents/RequestReplyAgent.Protocol.cs b/Lapine.Core/Agents/RequestReplyAgent.Protocol.cs index a4c82a4..e3f89c8 100644 --- a/Lapine.Core/Agents/RequestReplyAgent.Protocol.cs +++ b/Lapine.Core/Agents/RequestReplyAgent.Protocol.cs @@ -4,7 +4,7 @@ namespace Lapine.Agents; static partial class RequestReplyAgent where TRequest : ICommand where TReply : ICommand { abstract record Protocol; - record SendRequest(TRequest Request, AsyncReplyChannel ReplyChannel) : Protocol; + record SendRequest(TRequest Request, AsyncReplyChannel ReplyChannel) : Protocol; record OnFrameReceived(ICommand Command) : Protocol; record OnTimeout : Protocol; } diff --git a/Lapine.Core/Agents/RequestReplyAgent.Wrapper.cs b/Lapine.Core/Agents/RequestReplyAgent.Wrapper.cs index 8729280..12ee3d7 100644 --- a/Lapine.Core/Agents/RequestReplyAgent.Wrapper.cs +++ b/Lapine.Core/Agents/RequestReplyAgent.Wrapper.cs @@ -4,9 +4,8 @@ namespace Lapine.Agents; static partial class RequestReplyAgent where TRequest : ICommand where TReply : ICommand { class Wrapper(IAgent agent) : IRequestReplyAgent { - async Task> IRequestReplyAgent.Request(TRequest request) { - var reply = (Result) await agent.PostAndReplyAsync(replyChannel => new SendRequest(request, replyChannel)); - return reply; + async Task IRequestReplyAgent.Request(TRequest request) { + return await agent.PostAndReplyAsync(replyChannel => new SendRequest(request, replyChannel)); } } } diff --git a/Lapine.Core/Agents/RequestReplyAgent.cs b/Lapine.Core/Agents/RequestReplyAgent.cs index 1f60363..fcbf041 100644 --- a/Lapine.Core/Agents/RequestReplyAgent.cs +++ b/Lapine.Core/Agents/RequestReplyAgent.cs @@ -6,7 +6,7 @@ namespace Lapine.Agents; interface IRequestReplyAgent where TRequest : ICommand where TReply : ICommand { - Task> Request(TRequest request); + Task Request(TRequest request); } static partial class RequestReplyAgent where TRequest : ICommand where TReply : ICommand { diff --git a/Lapine.Core/Agents/SocketAgent.Behaviours.cs b/Lapine.Core/Agents/SocketAgent.Behaviours.cs index c966dce..a6ecb28 100644 --- a/Lapine.Core/Agents/SocketAgent.Behaviours.cs +++ b/Lapine.Core/Agents/SocketAgent.Behaviours.cs @@ -15,26 +15,27 @@ static Behaviour Disconnected() => switch (context.Message) { case Connect(var endpoint, var replyChannel, var cancellationToken): { var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + try { await socket.ConnectAsync(endpoint, cancellationToken); var events = new Subject(); var receivedFrames = new Subject(); - replyChannel.Reply(new Connected(events, receivedFrames)); + replyChannel.Reply((events, receivedFrames)); // Begin polling... await context.Self.PostAsync(new Poll()); - return context with { Behaviour = ConnectedBehaviour(socket, events, receivedFrames) }; + return context with { Behaviour = Connected(socket, events, receivedFrames) }; } - catch (OperationCanceledException) { - replyChannel.Reply(new ConnectionFailed(new TimeoutException())); + catch (OperationCanceledException cancelled) when (cancelled.CancellationToken == cancellationToken) { + replyChannel.Fault(new TimeoutException()); await context.Self.StopAsync(); return context; } catch (Exception fault) { - replyChannel.Reply(new ConnectionFailed(fault)); + replyChannel.Fault(fault); await context.Self.StopAsync(); return context; } @@ -43,7 +44,7 @@ static Behaviour Disconnected() => } }; - static Behaviour ConnectedBehaviour(Socket socket, Subject connectionEvents, Subject receivedFrames) { + static Behaviour Connected(Socket socket, Subject connectionEvents, Subject receivedFrames) { var transmitBuffer = new MemoryBufferWriter(4096); var (frameBuffer, tail) = (new Byte[DefaultMaximumFrameSize], 0); diff --git a/Lapine.Core/Agents/SocketAgent.Protocol.cs b/Lapine.Core/Agents/SocketAgent.Protocol.cs index 0ad793b..bf7b68b 100644 --- a/Lapine.Core/Agents/SocketAgent.Protocol.cs +++ b/Lapine.Core/Agents/SocketAgent.Protocol.cs @@ -1,10 +1,11 @@ namespace Lapine.Agents; using System.Net; +using Lapine.Protocol; static partial class SocketAgent { abstract record Protocol; - record Connect(IPEndPoint Endpoint, AsyncReplyChannel ReplyChannel, CancellationToken CancellationToken = default) : Protocol; + record Connect(IPEndPoint Endpoint, AsyncReplyChannel<(IObservable ConnectionEvents, IObservable ReceivedFrames)> ReplyChannel, CancellationToken CancellationToken = default) : Protocol; record Tune(UInt32 MaxFrameSize) : Protocol; record EnableTcpKeepAlives(TimeSpan ProbeTime, TimeSpan RetryInterval, Int32 RetryCount) : Protocol; record Transmit(ISerializable Entity) : Protocol; diff --git a/Lapine.Core/Agents/SocketAgent.Wrapper.cs b/Lapine.Core/Agents/SocketAgent.Wrapper.cs index 6ebf8bb..97e555a 100644 --- a/Lapine.Core/Agents/SocketAgent.Wrapper.cs +++ b/Lapine.Core/Agents/SocketAgent.Wrapper.cs @@ -1,12 +1,12 @@ namespace Lapine.Agents; using System.Net; +using Lapine.Protocol; static partial class SocketAgent { class Wrapper(IAgent agent) : ISocketAgent { - async Task ISocketAgent.ConnectAsync(IPEndPoint endpoint, CancellationToken cancellationToken) { - var reply = await agent.PostAndReplyAsync(replyChannel => new Connect(endpoint, replyChannel, cancellationToken)); - return (ConnectResult) reply; + async Task<(IObservable ConnectionEvents, IObservable ReceivedFrames)> ISocketAgent.ConnectAsync(IPEndPoint endpoint, CancellationToken cancellationToken) { + return await agent.PostAndReplyAsync<(IObservable ConnectionEvents, IObservable ReceivedFrames)>(replyChannel => new Connect(endpoint, replyChannel, cancellationToken)); } async Task ISocketAgent.Tune(UInt32 maxFrameSize) => diff --git a/Lapine.Core/Agents/SocketAgent.cs b/Lapine.Core/Agents/SocketAgent.cs index 50f9edd..f679b7c 100644 --- a/Lapine.Core/Agents/SocketAgent.cs +++ b/Lapine.Core/Agents/SocketAgent.cs @@ -6,12 +6,8 @@ namespace Lapine.Agents; abstract record ConnectionEvent; record RemoteDisconnected(Exception Fault) : ConnectionEvent; -abstract record ConnectResult; -record Connected(IObservable ConnectionEvents, IObservable ReceivedFrames) : ConnectResult; -record ConnectionFailed(Exception Fault) : ConnectResult; - interface ISocketAgent { - Task ConnectAsync(IPEndPoint endpoint, CancellationToken cancellationToken = default); + Task<(IObservable ConnectionEvents, IObservable ReceivedFrames)> ConnectAsync(IPEndPoint endpoint, CancellationToken cancellationToken = default); Task Tune(UInt32 maxFrameSize); Task EnableTcpKeepAlives(TimeSpan probeTime, TimeSpan retryInterval, Int32 retryCount); Task Transmit(ISerializable entity); diff --git a/Lapine.Core/Client/AmqpClient.cs b/Lapine.Core/Client/AmqpClient.cs index 14e3acc..fb25803 100644 --- a/Lapine.Core/Client/AmqpClient.cs +++ b/Lapine.Core/Client/AmqpClient.cs @@ -1,38 +1,20 @@ namespace Lapine.Client; -using System.Runtime.ExceptionServices; using Lapine.Agents; public class AmqpClient(ConnectionConfiguration connectionConfiguration) : IAsyncDisposable { readonly IAmqpClientAgent _agent = AmqpClientAgent.Create(); - public async ValueTask ConnectAsync(CancellationToken cancellationToken = default) { - switch (await _agent.EstablishConnection(connectionConfiguration, cancellationToken)) { - case true: { - return; - } - case Exception fault: { - ExceptionDispatchInfo - .Capture(fault) - .Throw(); - return; - } - } - } + public async ValueTask ConnectAsync(CancellationToken cancellationToken = default) => + await _agent.EstablishConnection(connectionConfiguration, cancellationToken); public async ValueTask OpenChannelAsync(CancellationToken cancellationToken = default) { - switch (await _agent.OpenChannel(cancellationToken)) { - case IChannelAgent channelAgent: { - return new Channel(channelAgent, connectionConfiguration); - } - case Exception fault: { - ExceptionDispatchInfo - .Capture(fault) - .Throw(); - return null; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(OpenChannelAsync)}' method."); - } + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(connectionConfiguration.CommandTimeout); + + var channelAgent = await _agent.OpenChannel(cts.Token); + + return new Channel(channelAgent, connectionConfiguration); } public async ValueTask DisposeAsync() { diff --git a/Lapine.Core/Client/Channel.cs b/Lapine.Core/Client/Channel.cs index 445c3a9..afdb974 100644 --- a/Lapine.Core/Client/Channel.cs +++ b/Lapine.Core/Client/Channel.cs @@ -2,7 +2,6 @@ namespace Lapine.Client; using System.ComponentModel; using Lapine.Agents; -using Lapine.Protocol; public class Channel { readonly IChannelAgent _agent; @@ -22,16 +21,8 @@ public async ValueTask CloseAsync(CancellationToken cancellationToken = default) using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.Close(cts.Token)) { - case true: { - _closed = true; - break; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(CloseAsync)}' method."); - } + await _agent.Close(cts.Token); + _closed = true; } public async ValueTask DeclareExchangeAsync(ExchangeDefinition definition, CancellationToken cancellationToken = default) { @@ -41,15 +32,7 @@ public async ValueTask DeclareExchangeAsync(ExchangeDefinition definition, Cance using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.DeclareExchange(definition, cts.Token)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(DeclareExchangeAsync)}' method."); - } + await _agent.DeclareExchange(definition, cts.Token); } public async ValueTask DeleteExchangeAsync(String exchange, DeleteExchangeCondition condition = DeleteExchangeCondition.None, CancellationToken cancellationToken = default) { @@ -59,15 +42,7 @@ public async ValueTask DeleteExchangeAsync(String exchange, DeleteExchangeCondit using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.DeleteExchange(exchange, condition, cts.Token)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(DeleteExchangeAsync)}' method."); - } + await _agent.DeleteExchange(exchange, condition, cts.Token); } public async ValueTask DeclareQueueAsync(QueueDefinition definition, CancellationToken cancellationToken = default) { @@ -77,15 +52,7 @@ public async ValueTask DeclareQueueAsync(QueueDefinition definition, Cancellatio using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.DeclareQueue(definition, cts.Token)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(DeclareQueueAsync)}' method."); - } + await _agent.DeclareQueue(definition, cts.Token); } public async ValueTask DeleteQueueAsync(String queue, DeleteQueueCondition condition = DeleteQueueCondition.None, CancellationToken cancellationToken = default) { @@ -95,15 +62,7 @@ public async ValueTask DeleteQueueAsync(String queue, DeleteQueueCondition condi using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.DeleteQueue(queue, condition, cts.Token)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(DeleteQueueAsync)}' method."); - } + await _agent.DeleteQueue(queue, condition, cts.Token); } public async ValueTask BindQueueAsync(Binding binding, CancellationToken cancellationToken = default) { @@ -113,15 +72,7 @@ public async ValueTask BindQueueAsync(Binding binding, CancellationToken cancell using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.BindQueue(binding, cts.Token)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(BindQueueAsync)}' method."); - } + await _agent.BindQueue(binding, cts.Token); } public async ValueTask UnbindQueueAsync(Binding binding, CancellationToken cancellationToken = default) { @@ -131,15 +82,7 @@ public async ValueTask UnbindQueueAsync(Binding binding, CancellationToken cance using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.UnbindQueue(binding, cts.Token)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(UnbindQueueAsync)}' method."); - } + await _agent.UnbindQueue(binding, cts.Token); } public async ValueTask PurgeQueueAsync(String queue, CancellationToken cancellationToken = default) { @@ -149,15 +92,7 @@ public async ValueTask PurgeQueueAsync(String queue, CancellationToken c using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.PurgeQueue(queue, cts.Token)) { - case UInt32 messageCount: { - return messageCount; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(PurgeQueueAsync)}' method."); - } + return await _agent.PurgeQueue(queue, cts.Token); } public async ValueTask PublishAsync(String exchange, String routingKey, (MessageProperties Properties, ReadOnlyMemory Payload) message, RoutingFlags routingFlags = RoutingFlags.None, CancellationToken cancellationToken = default) { @@ -167,15 +102,7 @@ public async ValueTask PublishAsync(String exchange, String routingKey, (Message using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.Publish(exchange, routingKey, routingFlags, (message.Properties.ToBasicProperties(), message.Payload), cts.Token)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message2: throw new Exception($"Unexpected message '{message2.GetType().FullName}' in '{nameof(PublishAsync)}' method."); - } + await _agent.Publish(exchange, routingKey, routingFlags, (message.Properties.ToBasicProperties(), message.Payload), cts.Token); } public async ValueTask<(DeliveryInfo Delivery, MessageProperties Properties, ReadOnlyMemory Body)?> GetMessageAsync(String queue, Acknowledgements acknowledgements, CancellationToken cancellationToken = default) { @@ -185,14 +112,14 @@ public async ValueTask PublishAsync(String exchange, String routingKey, (Message using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.GetMessage(queue, acknowledgements, cts.Token)) { - case (DeliveryInfo delivery, BasicProperties properties, ReadOnlyMemory body): { - return (delivery, MessageProperties.FromBasicProperties(properties), body); - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(GetMessageAsync)}' method."); + var message = await _agent.GetMessage(queue, acknowledgements, cts.Token); + + if (message.HasValue) { + var (delivery, properties, body) = message.Value; + return (delivery, MessageProperties.FromBasicProperties(properties), body); + } + else { + return null; } } @@ -200,30 +127,14 @@ public async ValueTask AcknowledgeAsync(UInt64 deliveryTag, Boolean multiple = f if (_closed) throw new InvalidOperationException("Channel is closed."); - switch (await _agent.Acknowledge(deliveryTag, multiple)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(AcknowledgeAsync)}' method."); - } + await _agent.Acknowledge(deliveryTag, multiple); } public async ValueTask RejectAsync(UInt64 deliveryTag, Boolean requeue) { if (_closed) throw new InvalidOperationException("Channel is closed."); - switch (await _agent.Reject(deliveryTag, requeue)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(RejectAsync)}' method."); - } + await _agent.Reject(deliveryTag, requeue); } public async ValueTask SetPrefetchLimitAsync(UInt16 limit, PrefetchLimitScope scope = PrefetchLimitScope.Consumer, CancellationToken cancellationToken = default) { @@ -233,19 +144,11 @@ public async ValueTask SetPrefetchLimitAsync(UInt16 limit, PrefetchLimitScope sc using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.SetPrefetchLimit(limit, scope switch { - PrefetchLimitScope.Consumer => false, - PrefetchLimitScope.Channel => true, - _ => throw new InvalidEnumArgumentException(nameof(scope), (Int32)scope, typeof(PrefetchLimitScope)) - }, cts.Token)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(SetPrefetchLimitAsync)}' method."); - } + await _agent.SetPrefetchLimit(limit, scope switch { + PrefetchLimitScope.Consumer => false, + PrefetchLimitScope.Channel => true, + _ => throw new InvalidEnumArgumentException(nameof(scope), (Int32)scope, typeof(PrefetchLimitScope)) + }, cts.Token); } public async ValueTask ConsumeAsync(String queue, ConsumerConfiguration consumerConfiguration, IReadOnlyDictionary? arguments = null, CancellationToken cancellationToken = default) { @@ -255,15 +158,7 @@ public async ValueTask ConsumeAsync(String queue, ConsumerConfiguration using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.Consume(queue, consumerConfiguration, arguments, cts.Token)) { - case String consumerTag: { - return consumerTag; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(ConsumeAsync)}' method."); - } + return await _agent.Consume(queue, consumerConfiguration, arguments, cts.Token); } public async ValueTask EnablePublisherConfirms(CancellationToken cancellationToken = default) { @@ -273,14 +168,6 @@ public async ValueTask EnablePublisherConfirms(CancellationToken cancellationTok using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_connectionConfiguration.CommandTimeout); - switch (await _agent.EnablePublisherConfirms(cts.Token)) { - case true: { - return; - } - case Exception fault: { - throw fault; - } - case var message: throw new Exception($"Unexpected message '{message.GetType().FullName}' in '{nameof(EnablePublisherConfirms)}' method."); - } + await _agent.EnablePublisherConfirms(cts.Token); } } diff --git a/Lapine.Core/Result.cs b/Lapine.Core/Result.cs deleted file mode 100644 index 666cbdf..0000000 --- a/Lapine.Core/Result.cs +++ /dev/null @@ -1,26 +0,0 @@ -namespace Lapine; - -using System.Runtime.ExceptionServices; - -abstract record Result { - public sealed record Ok(T Value) : Result; - public sealed record Fault(ExceptionDispatchInfo ExceptionDispatchInfo) : Result; - - public Result Map(Func fn) => this switch { - Ok(var value) => new Result.Ok(fn(value)), - Fault(var dispatchInfo) => new Result.Fault(dispatchInfo) - }; - - public T ValueOrThrow() { - switch (this) { - case Ok (var value): - return value; - case Fault (var dispatchInfo): { - dispatchInfo.Throw(); - throw dispatchInfo.SourceException; - } - default: - throw new Exception("Impossible!"); - } - } -} diff --git a/Lapine.Core/TaskExtensions.cs b/Lapine.Core/TaskExtensions.cs new file mode 100644 index 0000000..922e3e8 --- /dev/null +++ b/Lapine.Core/TaskExtensions.cs @@ -0,0 +1,108 @@ +namespace Lapine; + +static class TaskExtensions { + static public Task ContinueWith(this Task task, Action onCompleted, Action onFaulted) { + ArgumentNullException.ThrowIfNull(task); + ArgumentNullException.ThrowIfNull(onCompleted); + ArgumentNullException.ThrowIfNull(onFaulted); + + return task.ContinueWith(antecedent => { + if (antecedent.IsCompletedSuccessfully) { + onCompleted(); + } + else { + onFaulted(antecedent.Exception!.InnerException!); + } + }, TaskContinuationOptions.ExecuteSynchronously); + } + + static public Task ContinueWith(this Task task, Action onCompleted, Action onFaulted) { + ArgumentNullException.ThrowIfNull(task); + ArgumentNullException.ThrowIfNull(onCompleted); + ArgumentNullException.ThrowIfNull(onFaulted); + + return task.ContinueWith(antecedent => { + if (antecedent.IsCompletedSuccessfully) { + onCompleted(task.Result); + } + else { + onFaulted(antecedent.Exception!.InnerException!); + } + }, TaskContinuationOptions.ExecuteSynchronously); + } + + static public Task ContinueWith(this Task task, Func onCompleted, Action onFaulted) { + ArgumentNullException.ThrowIfNull(task); + ArgumentNullException.ThrowIfNull(onCompleted); + ArgumentNullException.ThrowIfNull(onFaulted); + + return task.ContinueWith(async antecedent => { + if (antecedent.IsCompletedSuccessfully) { + await onCompleted(); + } + else { + onFaulted(antecedent.Exception!.InnerException!); + } + }).Unwrap(); + } + + static public Task ContinueWith(this Task task, Func onCompleted, Action onFaulted) { + ArgumentNullException.ThrowIfNull(task); + ArgumentNullException.ThrowIfNull(onCompleted); + ArgumentNullException.ThrowIfNull(onFaulted); + + return task.ContinueWith(async antecedent => { + if (antecedent.IsCompletedSuccessfully) { + await onCompleted(antecedent.Result); + } + else { + onFaulted(antecedent.Exception!.InnerException!); + } + }).Unwrap(); + } + + static public Task ContinueWith(this Task task, Func onCompleted, Func onFaulted) { + ArgumentNullException.ThrowIfNull(task); + ArgumentNullException.ThrowIfNull(onCompleted); + ArgumentNullException.ThrowIfNull(onFaulted); + + return task.ContinueWith(antecedent => { + if (antecedent.IsCompletedSuccessfully) { + return Task.FromResult(onCompleted()); + } + else { + return Task.FromResult(onFaulted(antecedent.Exception!.InnerException!)); + } + }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); + } + + static public Task ContinueWith(this Task task, Func onCompleted, Func onFaulted) { + ArgumentNullException.ThrowIfNull(task); + ArgumentNullException.ThrowIfNull(onCompleted); + ArgumentNullException.ThrowIfNull(onFaulted); + + return task.ContinueWith(antecedent => { + if (antecedent.IsCompletedSuccessfully) { + return Task.FromResult(onCompleted(task.Result)); + } + else { + return Task.FromResult(onFaulted(antecedent.Exception!.InnerException!)); + } + }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); + } + + static public Task ContinueWith(this Task task, Func> onCompleted, Func onFaulted) { + ArgumentNullException.ThrowIfNull(task); + ArgumentNullException.ThrowIfNull(onCompleted); + ArgumentNullException.ThrowIfNull(onFaulted); + + return task.ContinueWith(async antecedent => { + if (antecedent.IsCompletedSuccessfully) { + return await onCompleted(task.Result); + } + else { + return onFaulted(antecedent.Exception!.InnerException!); + } + }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); + } +}