Skip to content

Commit

Permalink
Introduced an async consumer dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Feb 4, 2017
1 parent e414b85 commit c393768
Show file tree
Hide file tree
Showing 11 changed files with 729 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
using System;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;

namespace RabbitMQ.Client
{
public class AsyncDefaultBasicConsumer : IAsyncBasicConsumer
{
public readonly object m_eventLock = new object();
public AsyncEventHandler<ConsumerEventArgs> m_consumerCancelled;

/// <summary>
/// Creates a new instance of an <see cref="DefaultBasicConsumer"/>.
/// </summary>
public AsyncDefaultBasicConsumer()
{
ShutdownReason = null;
Model = null;
IsRunning = false;
ConsumerTag = null;
}

/// <summary>
/// Constructor which sets the Model property to the given value.
/// </summary>
/// <param name="model">Common AMQP model.</param>
public AsyncDefaultBasicConsumer(IModel model)
{
ShutdownReason = null;
IsRunning = false;
ConsumerTag = null;
Model = model;
}

/// <summary>
/// Retrieve the consumer tag this consumer is registered as; to be used when discussing this consumer
/// with the server, for instance with <see cref="IModel.BasicCancel"/>.
/// </summary>
public string ConsumerTag { get; set; }

/// <summary>
/// Returns true while the consumer is registered and expecting deliveries from the broker.
/// </summary>
public bool IsRunning { get; protected set; }

/// <summary>
/// If our <see cref="IModel"/> shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
/// </summary>
public ShutdownEventArgs ShutdownReason { get; protected set; }

/// <summary>
/// Signalled when the consumer gets cancelled.
/// </summary>
public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled
{
add
{
lock (m_eventLock)
{
m_consumerCancelled += value;
}
}
remove
{
lock (m_eventLock)
{
m_consumerCancelled -= value;
}
}
}

/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IModel Model { get; set; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
/// e.g. the queue has been deleted (either by this channel or by any other channel).
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual async Task HandleBasicCancel(string consumerTag)
{
await OnCancel().ConfigureAwait(false);
}

/// <summary>
/// Called upon successful deregistration of the consumer from the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual async Task HandleBasicCancelOk(string consumerTag)
{
await OnCancel().ConfigureAwait(false);
}

/// <summary>
/// Called upon successful registration of the consumer with the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicConsumeOk(string consumerTag)
{
ConsumerTag = consumerTag;
IsRunning = true;
return TaskExtensions.CompletedTask;
}

/// <summary>
/// Called each time a message arrives for this consumer.
/// </summary>
/// <remarks>
/// Does nothing with the passed in information.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
/// The implementation of this method in this class does NOT acknowledge such messages.
/// </remarks>
public virtual Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
// Nothing to do here.
return TaskExtensions.CompletedTask;
}

/// <summary>
/// Called when the model shuts down.
/// </summary>
/// <param name="model"> Common AMQP model.</param>
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
public virtual async Task HandleModelShutdown(object model, ShutdownEventArgs reason)
{
ShutdownReason = reason;
await OnCancel().ConfigureAwait(false);
}

/// <summary>
/// Default implementation - overridable in subclasses.</summary>
/// <remarks>
/// This default implementation simply sets the <see cref="IsRunning"/>
/// property to false, and takes no further action.
/// </remarks>
public virtual async Task OnCancel()
{
IsRunning = false;
AsyncEventHandler<ConsumerEventArgs> handler;
lock (m_eventLock)
{
handler = m_consumerCancelled;
}
if (handler != null)
{
foreach (AsyncEventHandler<ConsumerEventArgs> h in handler.GetInvocationList())
{
await h(this, new ConsumerEventArgs(ConsumerTag)).ConfigureAwait(false);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
/// </summary>
public bool AutomaticRecoveryEnabled { get; set; } = true;

/// <summary>
/// Set to true will enable a asynchronous consumer dispatcher which is compatible with <see cref="IAsyncBasicConsumer"/>.
/// Defaults to false.
/// </summary>
public bool DispatchConsumersAsync { get; set; } = false;

/// <summary>The host to connect to.</summary>
public string HostName { get; set; } = "localhost";

Expand Down Expand Up @@ -475,7 +481,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, String c
throw new BrokerUnreachableException(e);
}

return conn;
return DispatchConsumersAsync ? new AsyncConnectionDecorator(conn) : conn;
}

public IFrameHandler CreateFrameHandler()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System.Threading.Tasks;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
{
public interface IAsyncBasicConsumer
{
/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
IModel Model { get; }

/// <summary>
/// Signalled when the consumer gets cancelled.
/// </summary>
event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled;

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
/// e.g. the queue has been deleted (either by this channel or by any other channel).
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicCancel(string consumerTag);

/// <summary>
/// Called upon successful deregistration of the consumer from the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicCancelOk(string consumerTag);

/// <summary>
/// Called upon successful registration of the consumer with the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicConsumeOk(string consumerTag);

/// <summary>
/// Called each time a message arrives for this consumer.
/// </summary>
/// <remarks>
/// Does nothing with the passed in information.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
/// The implementation of this method in this class does NOT acknowledge such messages.
/// </remarks>
Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body);

/// <summary>
/// Called when the model shuts down.
/// </summary>
/// <param name="model"> Common AMQP model.</param>
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
Task HandleModelShutdown(object model, ShutdownEventArgs reason);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace RabbitMQ.Client
{
interface IAsyncConnection : IConnection
{
AsyncConsumerWorkService AsyncConsumerWorkService { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
using System;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Events
{
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event) where TEvent : EventArgs;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System;
using System.Threading.Tasks;
using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;

namespace RabbitMQ.Client.Events
{
public class AsyncEventingBasicConsumer : AsyncDefaultBasicConsumer
{
///<summary>Constructor which sets the Model property to the
///given value.</summary>
public AsyncEventingBasicConsumer(IModel model) : base(model)
{
}

///<summary>Event fired on HandleBasicDeliver.</summary>
public event AsyncEventHandler<BasicDeliverEventArgs> Received;

///<summary>Event fired on HandleBasicConsumeOk.</summary>
public event AsyncEventHandler<ConsumerEventArgs> Registered;

///<summary>Event fired on HandleModelShutdown.</summary>
public event AsyncEventHandler<ShutdownEventArgs> Shutdown;

///<summary>Event fired on HandleBasicCancelOk.</summary>
public event AsyncEventHandler<ConsumerEventArgs> Unregistered;

///<summary>Fires the Unregistered event.</summary>
public override async Task HandleBasicCancelOk(string consumerTag)
{
await base.HandleBasicCancelOk(consumerTag).ConfigureAwait(false);
await Raise(Unregistered, new ConsumerEventArgs(consumerTag)).ConfigureAwait(false);
}

///<summary>Fires the Registered event.</summary>
public override async Task HandleBasicConsumeOk(string consumerTag)
{
await base.HandleBasicConsumeOk(consumerTag).ConfigureAwait(false);
await Raise(Registered, new ConsumerEventArgs(consumerTag)).ConfigureAwait(false);
}

///<summary>Fires the Received event.</summary>
public override async Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
await base.HandleBasicDeliver(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body).ConfigureAwait(false);
await Raise(Received, new BasicDeliverEventArgs(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body)).ConfigureAwait(false);
}

///<summary>Fires the Shutdown event.</summary>
public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason)
{
await base.HandleModelShutdown(model, reason).ConfigureAwait(false);
await Raise(Shutdown, reason).ConfigureAwait(false);
}

private Task Raise<TEvent>(AsyncEventHandler<TEvent> eventHandler, TEvent evt)
where TEvent : EventArgs
{
var handler = eventHandler;
if (handler != null)
{
return handler(this, evt);
}
return TaskExtensions.CompletedTask;
}
}
}
Loading

0 comments on commit c393768

Please sign in to comment.