Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow-up to #1669 - per-channel dispatch concurrency #1671

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,6 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
7 changes: 1 addition & 6 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ namespace RabbitMQ.Client
///hosts with an empty name are not addressable. </para></remarks>
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
{
/// <summary>
/// Default value for consumer dispatch concurrency.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;

/// <summary>
/// Default value for the desired maximum channel number. Default: 2047.
/// </summary>
Expand Down Expand Up @@ -180,7 +175,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency;

/// <summary>The host to connect to.</summary>
public string HostName { get; set; } = "localhost";
Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default);
}
}
6 changes: 0 additions & 6 deletions projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@ namespace RabbitMQ.Client
{
public static class IConnectionExtensions
{
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);

/// <summary>
/// Asynchronously close this connection and all its channels.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl
{
internal class Channel : ChannelBase
{
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
: base(config, session, consumerDispatchConcurrency)
{
}
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/framing/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,13 @@ public static class Constants
public const int NotImplemented = 540;
///<summary>(= 541)</summary>
public const int InternalError = 541;

/// <summary>
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
/// to set this value for every channel created on a connection,
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
/// for setting this value for a particular channel.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,14 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
public async Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
.ConfigureAwait(false);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return channel;
Expand Down
6 changes: 4 additions & 2 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ internal abstract class ChannelBase : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
protected ChannelBase(ConnectionConfig config, ISession session,
ushort? perChannelConsumerDispatchConcurrency = null)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
Action<Exception, string> onException = (exception, context) =>
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)

_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
_channel0 = new Channel(_config, _session0);

ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
{
Expand Down Expand Up @@ -253,7 +253,8 @@ await CloseAsync(ea, true,
}
}

public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
public Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
ISession session = CreateSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,42 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
protected readonly ChannelReader<WorkStruct> _reader;
private readonly ChannelWriter<WorkStruct> _writer;
private readonly Task _worker;
private readonly ushort _concurrency;
private bool _quiesce = false;
private bool _disposed;

internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
{
_channel = channel;
_concurrency = concurrency;
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
{
SingleReader = concurrency == 1,
SingleReader = _concurrency == 1,
SingleWriter = false,
AllowSynchronousContinuations = false
});
_reader = workChannel.Reader;
_writer = workChannel.Writer;

Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
if (_concurrency == 1)
{
_worker = Task.Run(loopStart);
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
var tasks = new Task[_concurrency];
for (int i = 0; i < _concurrency; i++)
{
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}
}

public bool IsShutdown
{
get
{
return _quiesce;
}
}
public bool IsShutdown => _quiesce;

public ushort Concurrency => _concurrency;

public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ internal interface IConsumerDispatcher : IDisposable

bool IsShutdown { get; }

ushort Concurrency { get; }

IAsyncBasicConsumer GetAndRemoveConsumer(string tag);

ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken);
Expand Down
11 changes: 6 additions & 5 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class IntegrationFixture : IAsyncLifetime
protected readonly ITestOutputHelper _output;
protected readonly string _testDisplayName;

protected readonly ushort _consumerDispatchConcurrency = 1;
protected readonly ushort _consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency;
protected readonly bool _openChannel = true;

public static readonly TimeSpan ShortSpan;
Expand Down Expand Up @@ -109,7 +109,7 @@ static IntegrationFixture()
}

public IntegrationFixture(ITestOutputHelper output,
ushort consumerDispatchConcurrency = 1,
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
bool openChannel = true)
{
_consumerDispatchConcurrency = consumerDispatchConcurrency;
Expand Down Expand Up @@ -143,8 +143,7 @@ public virtual async Task InitializeAsync()
*/
if (_connFactory == null)
{
_connFactory = CreateConnectionFactory();
_connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency;
_connFactory = CreateConnectionFactory(_consumerDispatchConcurrency);
}

if (_conn == null)
Expand Down Expand Up @@ -517,13 +516,15 @@ protected static async Task WaitAsync(TaskCompletionSource<bool> tcs, TimeSpan t
}
}

protected ConnectionFactory CreateConnectionFactory()
protected ConnectionFactory CreateConnectionFactory(
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
{
return new ConnectionFactory
{
ClientProvidedName = $"{_testDisplayName}:{Util.Now}:{GetConnectionIdx()}",
ContinuationTimeout = WaitSpan,
HandshakeContinuationTimeout = WaitSpan,
ConsumerDispatchConcurrency = consumerDispatchConcurrency
};
}

Expand Down
35 changes: 34 additions & 1 deletion projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,28 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration
{
public class TestAsyncConsumer : IntegrationFixture
{
private const ushort ConsumerDispatchConcurrency = 2;

private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown");

public TestAsyncConsumer(ITestOutputHelper output)
: base(output, consumerDispatchConcurrency: 2)
: base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency)
{
}

[Fact]
public async Task TestBasicRoundtripConcurrent()
{
await ValidateConsumerDispatchConcurrency();

AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);

Expand Down Expand Up @@ -146,6 +151,8 @@ public async Task TestBasicRoundtripConcurrent()
[Fact]
public async Task TestBasicRoundtripConcurrentManyMessages()
{
await ValidateConsumerDispatchConcurrency();

AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);

Expand Down Expand Up @@ -323,6 +330,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
[Fact]
public async Task TestBasicRejectAsync()
{
await ValidateConsumerDispatchConcurrency();

string queueName = GenerateQueueName();

var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -421,6 +430,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
[Fact]
public async Task TestBasicAckAsync()
{
await ValidateConsumerDispatchConcurrency();

string queueName = GenerateQueueName();

const int messageCount = 1024;
Expand Down Expand Up @@ -488,6 +499,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
[Fact]
public async Task TestBasicNackAsync()
{
await ValidateConsumerDispatchConcurrency();

var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

_conn.ConnectionShutdown += (o, ea) =>
Expand Down Expand Up @@ -561,6 +574,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
[Fact]
public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
{
await ValidateConsumerDispatchConcurrency();

AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
var tasks = new List<Task>();
for (int i = 0; i < 256; i++)
Expand All @@ -581,6 +596,8 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
[Fact]
public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
{
await ValidateConsumerDispatchConcurrency();

string exchangeName = GenerateExchangeName();
string queue1Name = GenerateQueueName();
string queue2Name = GenerateQueueName();
Expand Down Expand Up @@ -650,6 +667,8 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
[Fact]
public async Task TestCloseWithinEventHandler_GH1567()
{
await ValidateConsumerDispatchConcurrency();

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

QueueDeclareOk q = await _channel.QueueDeclareAsync();
Expand Down Expand Up @@ -679,6 +698,20 @@ await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
Assert.True(await tcs.Task);
}

private async Task ValidateConsumerDispatchConcurrency()
{
ushort expectedConsumerDispatchConcurrency = (ushort)S_Random.Next(3, 10);
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
using (IChannel ch = await _conn.CreateChannelAsync(
consumerDispatchConcurrency: expectedConsumerDispatchConcurrency))
{
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency);
}
}

private static void SetException(Exception ex, params TaskCompletionSource<bool>[] tcsAry)
{
foreach (TaskCompletionSource<bool> tcs in tcsAry)
Expand Down
9 changes: 8 additions & 1 deletion projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration
{
public class TestAsyncEventingBasicConsumer : IntegrationFixture
{
private const ushort ConsumerDispatchConcurrency = 2;

private readonly CancellationTokenSource _cts = new CancellationTokenSource(ShortSpan);
private readonly CancellationTokenRegistration _ctr;
private readonly TaskCompletionSource<bool> _onCallbackExceptionTcs =
Expand All @@ -49,7 +52,7 @@ public class TestAsyncEventingBasicConsumer : IntegrationFixture
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

public TestAsyncEventingBasicConsumer(ITestOutputHelper output)
: base(output, consumerDispatchConcurrency: 2)
: base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency)
{
_ctr = _cts.Token.Register(OnTokenCanceled);
}
Expand Down Expand Up @@ -81,6 +84,10 @@ private Task AsyncConsumerOnReceived(object sender, BasicDeliverEventArgs @event
[Fact]
public async Task TestAsyncEventingBasicConsumer_GH1038()
{
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);

string exchangeName = GenerateExchangeName();
string queueName = GenerateQueueName();
string routingKey = string.Empty;
Expand Down
Loading