Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement full async channel #982

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@
</AssemblyAttribute>
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
<PackageReference Include="System.Memory" Version="4.5.4" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.FxCopAnalyzers" Version="3.3.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="MinVer" Version="2.3.0" PrivateAssets="All" />
<PackageReference Include="System.Memory" Version="4.5.4" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
</ItemGroup>

Expand Down
19 changes: 19 additions & 0 deletions projects/RabbitMQ.Client/client/FrameworkExtension/Interlocked.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Runtime.CompilerServices;

namespace RabbitMQ.Client
{
#if NETCOREAPP3_1 || NETSTANDARD
internal static class Interlocked
lukebakken marked this conversation as resolved.
Show resolved Hide resolved
{
public static ulong CompareExchange(ref ulong location1, ulong value, ulong comparand)
{
return (ulong)System.Threading.Interlocked.CompareExchange(ref Unsafe.As<ulong, long>(ref location1), (long)value, (long)comparand);
}

public static ulong Increment(ref ulong location1)
{
return (ulong)System.Threading.Interlocked.Add(ref Unsafe.As<ulong, long>(ref location1), 1L);
}
}
#endif
}
26 changes: 13 additions & 13 deletions projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
Expand All @@ -12,29 +12,29 @@ public class AsyncDefaultBasicConsumer : IBasicConsumer, IAsyncBasicConsumer
private readonly HashSet<string> _consumerTags = new HashSet<string>();

/// <summary>
/// Creates a new instance of an <see cref="DefaultBasicConsumer"/>.
/// Creates a new instance of an <see cref="AsyncDefaultBasicConsumer"/>.
/// </summary>
public AsyncDefaultBasicConsumer()
{
ShutdownReason = null;
Model = null;
Channel = null;
IsRunning = false;
}

/// <summary>
/// Constructor which sets the Model property to the given value.
/// Constructor which sets the <see cref="Channel"/> property to the given value.
/// </summary>
/// <param name="model">Common AMQP model.</param>
public AsyncDefaultBasicConsumer(IModel model)
/// <param name="channel">The channel.</param>
public AsyncDefaultBasicConsumer(IChannel channel)
{
ShutdownReason = null;
IsRunning = false;
Model = model;
Channel = channel;
}

/// <summary>
/// Retrieve the consumer tags this consumer is registered as; to be used when discussing this consumer
/// with the server, for instance with <see cref="IModel.BasicCancel"/>.
/// with the server, for instance with <see cref="IChannel.CancelConsumerAsync"/>.
/// </summary>
public string[] ConsumerTags
{
Expand All @@ -50,7 +50,7 @@ public string[] ConsumerTags
public bool IsRunning { get; protected set; }

/// <summary>
/// If our <see cref="IModel"/> shuts down, this property will contain a description of the reason for the
/// If our <see cref="IChannel"/> shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
/// </summary>
public ShutdownEventArgs ShutdownReason { get; protected set; }
Expand All @@ -61,10 +61,10 @@ public string[] ConsumerTags
public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled;

/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IModel Model { get; set; }
public IChannel Channel { get; set; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love how channel is better aligned with the purpose


/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
Expand Down Expand Up @@ -101,7 +101,7 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
/// Called each time a message is delivered for this consumer.
/// </summary>
/// <remarks>
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IModel.BasicAck"/>
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IChannel.AckMessageAsync"/>
/// if consuming in automatic acknowledgement mode.
/// Subclasses must copy or fully use delivery body before returning.
/// Accessing the body at a later point is unsafe as its memory can
Expand All @@ -120,7 +120,7 @@ public virtual Task HandleBasicDeliver(string consumerTag,
}

/// <summary>
/// Called when the model (channel) this consumer was registered on terminates.
/// Called when the channel this consumer was registered on terminates.
/// </summary>
/// <param name="model">A channel this consumer was registered on.</param>
/// <param name="reason">Shutdown context.</param>
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ namespace RabbitMQ.Client
/// //
/// IConnection conn = factory.CreateConnection();
/// //
/// IModel ch = conn.CreateModel();
/// IChannel ch = await conn.CreateChannelAsync().ConfigureAwait(false);
/// //
/// // ... use ch's IModel methods ...
/// // ... use ch's IChannel methods ...
/// //
/// ch.Close(Constants.ReplySuccess, "Closing the channel");
/// await ch.CloseAsync().ConfigureAwait(false);
/// conn.Close(Constants.ReplySuccess, "Closing the connection");
/// </code></example>
/// <para>
Expand Down Expand Up @@ -492,7 +492,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
else
{
var protocol = new RabbitMQ.Client.Framing.Protocol();
conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(CreateFrameHandler), clientProvidedName);
conn = protocol.CreateConnection(this, endpointResolver.SelectOne(CreateFrameHandler), clientProvidedName);
}
}
catch (Exception e)
Expand Down
24 changes: 12 additions & 12 deletions projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
using System;
using System.Collections.Generic;
using System.Linq;

using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
Expand All @@ -56,24 +56,24 @@ public class DefaultBasicConsumer : IBasicConsumer
public DefaultBasicConsumer()
{
ShutdownReason = null;
Model = null;
Channel = null;
IsRunning = false;
}

/// <summary>
/// Constructor which sets the Model property to the given value.
/// Constructor which sets the Channel property to the given value.
/// </summary>
/// <param name="model">Common AMQP model.</param>
public DefaultBasicConsumer(IModel model)
/// <param name="channel">The channel.</param>
public DefaultBasicConsumer(IChannel channel)
{
ShutdownReason = null;
IsRunning = false;
Model = model;
Channel = channel;
}

/// <summary>
/// Retrieve the consumer tags this consumer is registered as; to be used to identify
/// this consumer, for example, when cancelling it with <see cref="IModel.BasicCancel"/>.
/// this consumer, for example, when cancelling it with <see cref="IChannel.CancelConsumerAsync"/>.
/// This value is an array because a single consumer instance can be reused to consume on
/// multiple channels.
/// </summary>
Expand All @@ -91,7 +91,7 @@ public string[] ConsumerTags
public bool IsRunning { get; protected set; }

/// <summary>
/// If our <see cref="IModel"/> shuts down, this property will contain a description of the reason for the
/// If our <see cref="IChannel"/> shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
/// </summary>
public ShutdownEventArgs ShutdownReason { get; protected set; }
Expand All @@ -102,10 +102,10 @@ public string[] ConsumerTags
public event EventHandler<ConsumerEventArgs> ConsumerCancelled;

/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IModel Model { get; set; }
public IChannel Channel { get; set; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
Expand Down Expand Up @@ -141,7 +141,7 @@ public virtual void HandleBasicConsumeOk(string consumerTag)
/// Called each time a message is delivered for this consumer.
/// </summary>
/// <remarks>
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IModel.BasicAck"/>
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IChannel.AckMessageAsync"/>
/// if consuming in automatic acknowledgement mode.
/// Subclasses must copy or fully use delivery body before returning.
/// Accessing the body at a later point is unsafe as its memory can
Expand All @@ -159,7 +159,7 @@ public virtual void HandleBasicDeliver(string consumerTag,
}

/// <summary>
/// Called when the model (channel) this consumer was registered on terminates.
/// Called when the channel this consumer was registered on terminates.
/// </summary>
/// <param name="model">A channel this consumer was registered on.</param>
/// <param name="reason">Shutdown context.</param>
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/ExchangeType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace RabbitMQ.Client
/// </summary>
/// <remarks>
/// Use the static members of this class as values for the
/// "exchangeType" arguments for IModel methods such as
/// "exchangeType" arguments for IChannel methods such as
/// ExchangeDeclare. The broker may be extended with additional
/// exchange types that do not appear in this class.
/// </remarks>
Expand Down
14 changes: 7 additions & 7 deletions projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
using System;
using System.Threading.Tasks;

using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
{
public interface IAsyncBasicConsumer
{
/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
IModel Model { get; }
IChannel Channel { get; }

/// <summary>
/// Signalled when the consumer gets cancelled.
Expand Down Expand Up @@ -43,7 +43,7 @@ public interface IAsyncBasicConsumer
/// </summary>
/// <remarks>
/// Does nothing with the passed in information.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IChannel.AckMessageAsync"/>.
/// The implementation of this method in this class does NOT acknowledge such messages.
/// </remarks>
Task HandleBasicDeliver(string consumerTag,
bollhals marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -55,10 +55,10 @@ Task HandleBasicDeliver(string consumerTag,
ReadOnlyMemory<byte> body);

/// <summary>
/// Called when the model shuts down.
/// Called when the channel shuts down.
/// </summary>
/// <param name="model"> Common AMQP model.</param>
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
/// <param name="model">The channel.</param>
/// <param name="reason"> Information about the reason why a particular channel, session, or connection was destroyed.</param>
Task HandleModelShutdown(object model, ShutdownEventArgs reason);
bollhals marked this conversation as resolved.
Show resolved Hide resolved
}
}
17 changes: 7 additions & 10 deletions projects/RabbitMQ.Client/client/api/IBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//---------------------------------------------------------------------------

using System;

using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
Expand All @@ -39,9 +39,6 @@ namespace RabbitMQ.Client
///receive messages from a queue by subscription.</summary>
/// <remarks>
/// <para>
/// See IModel.BasicConsume, IModel.BasicCancel.
/// </para>
/// <para>
/// Note that the "Handle*" methods run in the connection's
/// thread! Consider using <see cref="EventingBasicConsumer"/>, which uses a
/// SharedQueue instance to safely pass received messages across
Expand All @@ -51,10 +48,10 @@ namespace RabbitMQ.Client
public interface IBasicConsumer
{
/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
IModel Model { get; }
IChannel Channel { get; }

/// <summary>
/// Signalled when the consumer gets cancelled.
Expand Down Expand Up @@ -86,7 +83,7 @@ public interface IBasicConsumer
/// </summary>
/// <remarks>
/// Does nothing with the passed in information.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IChannel.AckMessageAsync"/>.
/// The implementation of this method in this class does NOT acknowledge such messages.
/// </remarks>
void HandleBasicDeliver(string consumerTag,
bollhals marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -98,10 +95,10 @@ void HandleBasicDeliver(string consumerTag,
ReadOnlyMemory<byte> body);

/// <summary>
/// Called when the model shuts down.
/// Called when the channel shuts down.
/// </summary>
/// <param name="model"> Common AMQP model.</param>
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
/// <param name="model">The channel.</param>
/// <param name="reason"> Information about the reason why a particular channel, session, or connection was destroyed.</param>
void HandleModelShutdown(object model, ShutdownEventArgs reason);
bollhals marked this conversation as resolved.
Show resolved Hide resolved
}
}
6 changes: 0 additions & 6 deletions projects/RabbitMQ.Client/client/api/IBasicProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ namespace RabbitMQ.Client
/// 0-8, 0-8qpid, 0-9 and 0-9-1 of AMQP.</summary>
/// <remarks>
/// <para>
/// The specification code generator provides
/// protocol-version-specific implementations of this interface. To
/// obtain an implementation of this interface in a
/// protocol-version-neutral way, use <see cref="IModel.CreateBasicProperties"/>.
/// </para>
/// <para>
/// Each property is readable, writable and clearable: a cleared
/// property will not be transmitted over the wire. Properties on a
/// fresh instance are clear by default.
Expand Down
14 changes: 7 additions & 7 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
using System.Collections.Generic;
using System.IO;
using System.Threading;

using System.Threading.Tasks;
using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
Expand Down Expand Up @@ -218,7 +218,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// Abort this connection and all its channels.
/// </summary>
/// <remarks>
/// Note that all active channels, sessions, and models will be closed if this method is called.
/// Note that all active channels, sessions, and consumers will be closed if this method is called.
/// In comparison to normal <see cref="Close()"/> method, <see cref="Abort()"/> will not throw
/// <see cref="IOException"/> during closing connection.
///This method waits infinitely for the in-progress close operation to complete.
Expand Down Expand Up @@ -275,7 +275,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// Close this connection and all its channels.
/// </summary>
/// <remarks>
/// Note that all active channels, sessions, and models will be
/// Note that all active channels, sessions, and consumers will be
/// closed if this method is called. It will wait for the in-progress
/// close operation to complete. This method will not return to the caller
/// until the shutdown is complete. If the connection is already closed
Expand Down Expand Up @@ -304,7 +304,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// and wait with a timeout for all the in-progress close operations to complete.
/// </summary>
/// <remarks>
/// Note that all active channels, sessions, and models will be
/// Note that all active channels, sessions, and consumers will be
/// closed if this method is called. It will wait for the in-progress
/// close operation to complete with a timeout. If the connection is
/// already closed (or closing), then this method will do nothing.
Expand Down Expand Up @@ -336,9 +336,9 @@ public interface IConnection : INetworkConnection, IDisposable
void Close(ushort reasonCode, string reasonText, TimeSpan timeout);
bollhals marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Create and return a fresh channel, session, and model.
/// Create and return a fresh channel, session.
/// </summary>
IModel CreateModel();
ValueTask<IChannel> CreateChannelAsync();

/// <summary>
/// Handle incoming Connection.Blocked methods.
Expand Down
Loading