Skip to content

Commit

Permalink
Spike RPC acquiring
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Dec 9, 2024
1 parent a8dc8ce commit 0bbf5f2
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 94 deletions.
7 changes: 6 additions & 1 deletion projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@

namespace RabbitMQ.Client.Impl
{
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation
internal interface IAsyncRpcContinuation : IRpcContinuation
{
CancellationToken CancellationToken { get; }
}

internal abstract class AsyncRpcContinuation<T> : IAsyncRpcContinuation
{
private readonly CancellationTokenSource _continuationTimeoutCancellationTokenSource;
private readonly CancellationTokenRegistration _continuationTimeoutCancellationTokenRegistration;
Expand Down
41 changes: 14 additions & 27 deletions projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,43 +126,30 @@ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
}

private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
private async Task MaybeConfirmSelect(RpcParentLease<ChannelOpenAsyncRpcContinuation> parentLease,
CancellationToken cancellationToken)
{
if (_publisherConfirmationsEnabled)
{
// NOTE: _rpcSemaphore is held
bool enqueued = false;
var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
using var lease = AcquireRpcChildLease(parentLease, new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken));

try
if (_nextPublishSeqNo == 0UL)
{
if (_nextPublishSeqNo == 0UL)
if (_publisherConfirmationTrackingEnabled)
{
if (_publisherConfirmationTrackingEnabled)
{
_confirmsTaskCompletionSources.Clear();
}
_nextPublishSeqNo = 1;
_confirmsTaskCompletionSources.Clear();
}
_nextPublishSeqNo = 1;
}

enqueued = Enqueue(k);

var method = new ConfirmSelect(false);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
var method = new ConfirmSelect(false);
await ModelSendAsync(in method, lease.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await lease.Continuation;
Debug.Assert(result);

return;
}
finally
{
if (false == enqueued)
{
k.Dispose();
}
}
return;
}
}

Expand Down
177 changes: 111 additions & 66 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,26 +203,23 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort,
public async Task CloseAsync(ShutdownEventArgs args, bool abort,
CancellationToken cancellationToken)
{
bool enqueued = false;
var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken);

await _rpcSemaphore.WaitAsync(k.CancellationToken)
using var lease = await AcquireRpcLeaseAsync(new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken))
.ConfigureAwait(false);

try
{
ChannelShutdownAsync += k.OnConnectionShutdownAsync;
enqueued = Enqueue(k);
ChannelShutdownAsync += lease.Continuation.OnConnectionShutdownAsync;
ConsumerDispatcher.Quiesce();

if (SetCloseReason(args))
{
var method = new ChannelClose(
args.ReplyCode, args.ReplyText, args.ClassId, args.MethodId);
await ModelSendAsync(in method, k.CancellationToken)
await ModelSendAsync(in method, lease.CancellationToken)
.ConfigureAwait(false);
}

bool result = await k;
bool result = await lease.Continuation;
Debug.Assert(result);

await ConsumerDispatcher.WaitForShutdownAsync()
Expand Down Expand Up @@ -251,12 +248,7 @@ await ConsumerDispatcher.WaitForShutdownAsync()
}
finally
{
if (false == enqueued)
{
k.Dispose();
}
_rpcSemaphore.Release();
ChannelShutdownAsync -= k.OnConnectionShutdownAsync;
ChannelShutdownAsync -= lease.Continuation.OnConnectionShutdownAsync;
}
}

Expand All @@ -272,38 +264,22 @@ internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationTok
internal async ValueTask<ConnectionSecureOrTune> ConnectionSecureOkAsync(byte[] response,
CancellationToken cancellationToken)
{
bool enqueued = false;
var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken);

await _rpcSemaphore.WaitAsync(k.CancellationToken)
using var lease = await AcquireRpcLeaseAsync(new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken))
.ConfigureAwait(false);
try
{
enqueued = Enqueue(k);

try
{
var method = new ConnectionSecureOk(response);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
}
catch (AlreadyClosedException)
{
// let continuation throw OperationInterruptedException,
// which is a much more suitable exception before connection
// negotiation finishes
}

return await k;
var method = new ConnectionSecureOk(response);
await ModelSendAsync(in method, lease.CancellationToken)
.ConfigureAwait(false);
}
finally
catch (AlreadyClosedException)
{
if (false == enqueued)
{
k.Dispose();
}
_rpcSemaphore.Release();
// let continuation throw OperationInterruptedException,
// which is a much more suitable exception before connection
// negotiation finishes
}

return await lease.Continuation;
}

internal async ValueTask<ConnectionSecureOrTune> ConnectionStartOkAsync(
Expand Down Expand Up @@ -359,40 +335,111 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync(CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken)
private RpcChildLease<TParent, TContinuation> AcquireRpcChildLease<TParent, TContinuation>(TParent parent, TContinuation k)
where TParent : struct
where TContinuation : IAsyncRpcContinuation
{
ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled,
createChannelOptions.PublisherConfirmationTrackingEnabled,
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter);

bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
if (IsOpen)
{
_continuationQueue.Enqueue(k);
return new RpcChildLease<TParent, TContinuation>(parent, k, false);
}
else
{
k.HandleChannelShutdown(CloseReason);
return new RpcChildLease<TParent, TContinuation>(parent, k, true);
}
}

private async ValueTask<RpcParentLease<TContinuation>> AcquireRpcLeaseAsync<TContinuation>(TContinuation k)
where TContinuation : IAsyncRpcContinuation
{
await _rpcSemaphore.WaitAsync(k.CancellationToken)
.ConfigureAwait(false);
try
if (IsOpen)
{
enqueued = Enqueue(k);
_continuationQueue.Enqueue(k);
return new RpcParentLease<TContinuation>(_rpcSemaphore, k, false);
}
else
{
k.HandleChannelShutdown(CloseReason);
return new RpcParentLease<TContinuation>(_rpcSemaphore, k, true);
}
}

var method = new ChannelOpen();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
readonly struct RpcChildLease<TParent, TContinuation> : IDisposable
where TParent : struct
where TContinuation : IAsyncRpcContinuation
{

bool result = await k;
Debug.Assert(result);
private readonly bool _ownsContinuation;

await MaybeConfirmSelect(cancellationToken)
.ConfigureAwait(false);
public RpcChildLease(TParent parent, TContinuation continuation, bool ownsContinuation)
{
Continuation = continuation;
_ownsContinuation = ownsContinuation;
}
finally

public TContinuation Continuation { get; }

public CancellationToken CancellationToken => Continuation.CancellationToken;

public void Dispose()
{
if (false == enqueued)
if (_ownsContinuation)
{
k.Dispose();
Continuation.Dispose();
}
_rpcSemaphore.Release();
}
}

readonly struct RpcParentLease<TContinuation> : IDisposable
where TContinuation : IAsyncRpcContinuation
{

private readonly bool _ownsContinuation;
private readonly SemaphoreSlim _semaphore;

public RpcParentLease(SemaphoreSlim semaphore, TContinuation continuation, bool ownsContinuation)
{
_semaphore = semaphore;
Continuation = continuation;
_ownsContinuation = ownsContinuation;
}

public TContinuation Continuation { get; }

public CancellationToken CancellationToken => Continuation.CancellationToken;

public void Dispose()
{
if (_ownsContinuation)
{
Continuation.Dispose();
}
_semaphore.Release();
}
}

internal async Task<IChannel> OpenAsync(CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled,
createChannelOptions.PublisherConfirmationTrackingEnabled,
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter);

using var lease = await AcquireRpcLeaseAsync(new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken))
.ConfigureAwait(false);
var method = new ChannelOpen();
await ModelSendAsync(in method, lease.CancellationToken)
.ConfigureAwait(false);

bool result = await lease.Continuation;
Debug.Assert(result);

await MaybeConfirmSelect(lease, cancellationToken)
.ConfigureAwait(false);

return this;
}
Expand Down Expand Up @@ -422,11 +469,9 @@ private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken can
if (false == await DispatchCommandAsync(cmd, cancellationToken)
.ConfigureAwait(false))
{
using (IRpcContinuation c = _continuationQueue.Next())
{
await c.HandleCommandAsync(cmd)
.ConfigureAwait(false);
}
using IRpcContinuation c = _continuationQueue.Next();
await c.HandleCommandAsync(cmd)
.ConfigureAwait(false);
}
}
finally
Expand Down

0 comments on commit 0bbf5f2

Please sign in to comment.