Skip to content

Commit

Permalink
use async consumer only
Browse files Browse the repository at this point in the history
  • Loading branch information
bollhals authored and lukebakken committed Jul 29, 2024
1 parent 5b45d25 commit d0021aa
Show file tree
Hide file tree
Showing 57 changed files with 323 additions and 1,516 deletions.
52 changes: 6 additions & 46 deletions projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Benchmarks
{
internal sealed class AsyncBasicConsumerFake : IAsyncBasicConsumer, IBasicConsumer
internal sealed class AsyncBasicConsumerFake : IAsyncBasicConsumer
{
private readonly ManualResetEventSlim _autoResetEvent;
private int _current;
Expand All @@ -18,7 +17,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
_autoResetEvent = autoResetEvent;
}

public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
Expand All @@ -29,53 +28,14 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel
return Task.CompletedTask;
}

Task IBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
{
_current = 0;
_autoResetEvent.Set();
}
return Task.CompletedTask;
}
public Task HandleBasicCancelAsync(string consumerTag) => Task.CompletedTask;

public Task HandleBasicCancel(string consumerTag) => Task.CompletedTask;
public Task HandleBasicCancelOkAsync(string consumerTag) => Task.CompletedTask;

public Task HandleBasicCancelOk(string consumerTag) => Task.CompletedTask;
public Task HandleBasicConsumeOkAsync(string consumerTag) => Task.CompletedTask;

public Task HandleBasicConsumeOk(string consumerTag) => Task.CompletedTask;

public Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) => Task.CompletedTask;
public Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) => Task.CompletedTask;

public IChannel Channel { get; }

event EventHandler<ConsumerEventArgs> IBasicConsumer.ConsumerCancelled
{
add { }
remove { }
}

public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled
{
add { }
remove { }
}

void IBasicConsumer.HandleBasicCancelOk(string consumerTag)
{
}

void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
{
}

void IBasicConsumer.HandleChannelShutdown(object channel, ShutdownEventArgs reason)
{
}

void IBasicConsumer.HandleBasicCancel(string consumerTag)
{
}
}
}
25 changes: 1 addition & 24 deletions projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,7 @@ public async Task AsyncConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body,
CancellationToken.None);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}
}

[GlobalSetup(Target = nameof(ConsumerDispatcher))]
public async Task SetUpConsumer()
{
_consumer.Count = Count;
_dispatcher = new ConsumerDispatcher(null, Concurrency);
await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None);
}

[Benchmark]
public async Task ConsumerDispatcher()
{
using (RentedMemory body = new RentedMemory(_body))
{
for (int i = 0; i < Count; i++)
{
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body,
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, default, body,
CancellationToken.None);
}
_autoResetEvent.Wait();
Expand Down
42 changes: 29 additions & 13 deletions projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Benchmarks.Networking
{
Expand All @@ -11,29 +11,45 @@ public class Networking_BasicDeliver_Commons
{
public static async Task Publish_Hello_World(IConnection connection, uint messageCount, byte[] body)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
using (IChannel channel = await connection.CreateChannelAsync())
{
QueueDeclareOk queue = await channel.QueueDeclareAsync();
int consumed = 0;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, args) =>
{
if (Interlocked.Increment(ref consumed) == messageCount)
{
tcs.SetResult(true);
}
};
var consumer = new CountingConsumer(messageCount);
await channel.BasicConsumeAsync(queue.QueueName, true, consumer);

for (int i = 0; i < messageCount; i++)
{
await channel.BasicPublishAsync("", queue.QueueName, body);
}

await tcs.Task;
await consumer.CompletedTask.ConfigureAwait(false);
await channel.CloseAsync();
}
}
}

internal sealed class CountingConsumer : AsyncDefaultBasicConsumer
{
private int _remainingCount;
private readonly TaskCompletionSource<bool> _tcs;

public Task CompletedTask => _tcs.Task;

public CountingConsumer(uint messageCount)
{
_remainingCount = (int)messageCount;
_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}

/// <inheritdoc />
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Decrement(ref _remainingCount) == 0)
{
_tcs.SetResult(true);
}

return Task.CompletedTask;
}
}
}
Loading

0 comments on commit d0021aa

Please sign in to comment.