Skip to content

Commit

Permalink
Merge pull request #1560 from rabbitmq/lukebakken/misc-changes-from-a…
Browse files Browse the repository at this point in the history
…sync-session-shutdown

Misc changes
  • Loading branch information
lukebakken authored May 9, 2024
2 parents d39d79a + 0a06a23 commit 5adb0a8
Show file tree
Hide file tree
Showing 13 changed files with 396 additions and 213 deletions.
14 changes: 11 additions & 3 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ jobs:
Receive-Job -Job $tx; `
& "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; `
dotnet test `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' `
--environment 'PASSWORD=grapefruit' `
--environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" `
Expand Down Expand Up @@ -114,7 +114,12 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Sequential Integration Tests
run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
run: dotnet test `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
--environment 'PASSWORD=grapefruit' `
--environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" `
"${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Maybe upload RabbitMQ logs
if: failure()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -182,8 +187,8 @@ jobs:
- name: Integration Tests
run: |
dotnet test \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \
Expand Down Expand Up @@ -222,7 +227,10 @@ jobs:
- name: Sequential Integration Tests
run: |
dotnet test \
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \
"${{ github.workspace }}/projects/Test/SequentialIntegration/SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Maybe upload RabbitMQ logs
if: failure()
Expand Down
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ build:
test:
dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed'
dotnet test --environment 'GITHUB_ACTIONS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
"$(CURDIR)/projects/Test/Integration/Integration.csproj" --logger 'console;verbosity=detailed'
dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed'
dotnet test --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
$(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed'

# Note:
# You must have the expected OAuth2 environment set up for this target
Expand Down
88 changes: 88 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client.Impl
{
#nullable enable
internal struct AsyncEventingWrapper<T>
{
private event AsyncEventHandler<T>? _event;
private Delegate[]? _handlers;
private string? _context;
private Func<Exception, string, Task>? _onException;

public readonly bool IsEmpty => _event is null;

public AsyncEventingWrapper(string context, Func<Exception, string, Task> onException)
{
_event = null;
_handlers = null;
_context = context;
_onException = onException;
}

public void AddHandler(AsyncEventHandler<T>? handler)
{
_event += handler;
_handlers = null;
}

public void RemoveHandler(AsyncEventHandler<T>? handler)
{
_event -= handler;
_handlers = null;
}

// Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied)
public Task InvokeAsync(object sender, T parameter)
{
Delegate[]? handlers = _handlers;
if (handlers is null)
{
handlers = _event?.GetInvocationList();
if (handlers is null)
{
return Task.CompletedTask;
}

_handlers = handlers;
}

return InternalInvoke(handlers, sender, parameter);
}

private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter)
{
foreach (AsyncEventHandler<T> action in handlers.Cast<AsyncEventHandler<T>>())
{
try
{
await action(sender, parameter)
.ConfigureAwait(false);
}
catch (Exception exception)
{
if (_onException != null)
{
await _onException(exception, _context!)
.ConfigureAwait(false);
}
else
{
throw;
}
}
}
}

public void Takeover(in AsyncEventingWrapper<T> other)
{
_event = other._event;
_handlers = other._handlers;
_context = other._context;
_onException = other._onException;
}
}
}
7 changes: 7 additions & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ await FinishCloseAsync(cts.Token)
_heartbeatReadTimer?.Change((int)Heartbeat.TotalMilliseconds, Timeout.Infinite);
}
}
catch (OperationCanceledException)
{
if (false == _mainLoopCts.IsCancellationRequested)
{
throw;
}
}
catch (ObjectDisposedException)
{
// timer is already disposed,
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
Expand Down Expand Up @@ -335,6 +336,13 @@ await _session0.TransmitAsync(method, cancellationToken)
.ConfigureAwait(false);
}
}
catch (ChannelClosedException)
{
if (false == abort)
{
throw;
}
}
catch (AlreadyClosedException)
{
if (false == abort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,49 @@ internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)

protected override async Task ProcessChannelAsync(CancellationToken token)
{
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
try
{
while (_reader.TryRead(out WorkStruct work))
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
{
using (work)
while (_reader.TryRead(out WorkStruct work))
{
try
using (work)
{
Task task = work.WorkType switch
try
{
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
work.ConsumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),
Task task = work.WorkType switch
{
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
work.ConsumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),

WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),
WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),

WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),
WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),

WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),
WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),

WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),
WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),

_ => Task.CompletedTask
};
await task.ConfigureAwait(false);
}
catch (Exception e)
{
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
_ => Task.CompletedTask
};
await task.ConfigureAwait(false);
}
catch (Exception e)
{
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
}
}
}
}
}
catch (OperationCanceledException)
{
if (false == token.IsCancellationRequested)
{
throw;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,55 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency)

protected override async Task ProcessChannelAsync(CancellationToken token)
{
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
try
{
while (_reader.TryRead(out var work))
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
{
using (work)
while (_reader.TryRead(out WorkStruct work))
{
try
using (work)
{
IBasicConsumer consumer = work.Consumer;
string? consumerTag = work.ConsumerTag;
switch (work.WorkType)
try
{
case WorkType.Deliver:
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
.ConfigureAwait(false);
break;
case WorkType.Cancel:
consumer.HandleBasicCancel(consumerTag);
break;
case WorkType.CancelOk:
consumer.HandleBasicCancelOk(consumerTag);
break;
case WorkType.ConsumeOk:
consumer.HandleBasicConsumeOk(consumerTag);
break;
case WorkType.Shutdown:
consumer.HandleChannelShutdown(_channel, work.Reason);
break;
IBasicConsumer consumer = work.Consumer;
string? consumerTag = work.ConsumerTag;
switch (work.WorkType)
{
case WorkType.Deliver:
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
.ConfigureAwait(false);
break;
case WorkType.Cancel:
consumer.HandleBasicCancel(consumerTag);
break;
case WorkType.CancelOk:
consumer.HandleBasicCancelOk(consumerTag);
break;
case WorkType.ConsumeOk:
consumer.HandleBasicConsumeOk(consumerTag);
break;
case WorkType.Shutdown:
consumer.HandleChannelShutdown(_channel, work.Reason);
break;
}
}
catch (Exception e)
{
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
}
}
catch (Exception e)
{
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
}
}
}
}
catch (OperationCanceledException)
{
if (false == token.IsCancellationRequested)
{
throw;
}
}
}
}
}
Loading

0 comments on commit 5adb0a8

Please sign in to comment.