Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port rabbitmq/rabbitmq-dotnet-client #950 to main #1165

Merged
merged 1 commit into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static class IConnectionExtensions
/// </remarks>
public static void Close(this IConnection connection)
{
connection.Close(Constants.ReplySuccess, "Goodbye", Timeout.InfiniteTimeSpan, false);
connection.Close(Constants.ReplySuccess, "Goodbye", TimeSpan.FromSeconds(30), false);
}

/// <summary>
Expand All @@ -37,7 +37,7 @@ public static void Close(this IConnection connection)
/// </remarks>
public static void Close(this IConnection connection, ushort reasonCode, string reasonText)
{
connection.Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan, false);
connection.Close(reasonCode, reasonText, TimeSpan.FromSeconds(30), false);
}

/// <summary>
Expand Down Expand Up @@ -93,7 +93,7 @@ public static void Close(this IConnection connection, ushort reasonCode, string
/// </remarks>
public static void Abort(this IConnection connection)
{
connection.Close(Constants.ReplySuccess, "Connection close forced", Timeout.InfiniteTimeSpan, true);
connection.Close(Constants.ReplySuccess, "Connection close forced", TimeSpan.FromSeconds(5), true);
}

/// <summary>
Expand All @@ -111,7 +111,7 @@ public static void Abort(this IConnection connection)
/// </remarks>
public static void Abort(this IConnection connection, ushort reasonCode, string reasonText)
{
connection.Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan, true);
connection.Close(reasonCode, reasonText, TimeSpan.FromSeconds(5), true);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public RecoveryAwareModel CreateNonRecoveringModel()
public override string ToString()
=> $"AutorecoveringConnection({InnerConnection.Id},{Endpoint},{GetHashCode()})";

internal IFrameHandler FrameHandler => InnerConnection.FrameHandler;

internal void Init()
{
Init(_factory.EndpointResolverFactory(new List<AmqpTcpEndpoint> { _factory.Endpoint }));
Expand Down
24 changes: 16 additions & 8 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@ public Connection(IConnectionFactory factory, IFrameHandler frameHandler, string
public bool IsOpen => CloseReason is null;

public int LocalPort => _frameHandler.LocalPort;

///<summary>Another overload of a Protocol property, useful
///for exposing a tighter type.</summary>
internal ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol;

public int RemotePort => _frameHandler.RemotePort;

public IDictionary<string, object?>? ServerProperties { get; private set; }
Expand All @@ -123,6 +118,16 @@ public Connection(IConnectionFactory factory, IFrameHandler frameHandler, string
///<summary>Explicit implementation of IConnection.Protocol.</summary>
IProtocol IConnection.Protocol => Endpoint.Protocol;

///<summary>Another overload of a Protocol property, useful
///for exposing a tighter type.</summary>
internal ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol;

///<summary>Used for testing only.</summary>
internal IFrameHandler FrameHandler
{
get { return _frameHandler; }
}

public event EventHandler<CallbackExceptionEventArgs> CallbackException
{
add => _callbackExceptionWrapper.AddHandler(value);
Expand Down Expand Up @@ -259,7 +264,7 @@ public void Close(ushort reasonCode, string reasonText, TimeSpan timeout, bool a
///</para>
///<para>
///Timeout determines how much time internal close operations should be given
///to complete. System.Threading.Timeout.InfiniteTimeSpan value means infinity.
///to complete.
///</para>
///</remarks>
internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
Expand All @@ -279,8 +284,11 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
try
{
// Try to send connection.close wait for CloseOk in the MainLoop
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
if (!_closed)
{
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
}
}
catch (AlreadyClosedException)
{
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ protected void HandleConnectionStart(in IncomingCommand cmd)
if (m_connectionStartCell is null)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start");
Session.Connection.Close(reason, false, Timeout.InfiniteTimeSpan);
Session.Connection.Close(reason, false, TimeSpan.FromSeconds(30));
}

var method = new ConnectionStart(cmd.MethodBytes.Span);
Expand Down
6 changes: 5 additions & 1 deletion projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ public void Close()
{
lock (_semaphore)
{
if (!_closed)
if (_closed || _socket == null)
{
return;
}
else
{
try
{
Expand Down
56 changes: 56 additions & 0 deletions projects/Unit/TestConnectionShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
using System.Threading;

using RabbitMQ.Client.Impl;
using RabbitMQ.Client.Framing.Impl;

using Xunit;

Expand All @@ -41,6 +42,61 @@ namespace RabbitMQ.Client.Unit

public class TestConnectionShutdown : IntegrationFixture
{
[Fact]
public void TestCleanClosureWithSocketClosedOutOfBand()
{
_conn = CreateAutorecoveringConnection();
_model = _conn.CreateModel();

var latch = new ManualResetEventSlim(false);
_model.ModelShutdown += (model, args) => {
latch.Set();
};

var c = (AutorecoveringConnection)_conn;
c.FrameHandler.Close();

_conn.Close(TimeSpan.FromSeconds(4));
Wait(latch, TimeSpan.FromSeconds(5));
}

[Fact]
public void TestAbortWithSocketClosedOutOfBand()
{
_conn = CreateAutorecoveringConnection();
_model = _conn.CreateModel();

var latch = new ManualResetEventSlim(false);
_model.ModelShutdown += (model, args) => {
latch.Set();
};

var c = (AutorecoveringConnection)_conn;
c.FrameHandler.Close();

_conn.Abort();
// default Connection.Abort() timeout and then some
Wait(latch, TimeSpan.FromSeconds(6));
}

[Fact]
public void TestDisposedWithSocketClosedOutOfBand()
{
_conn = CreateAutorecoveringConnection();
_model = _conn.CreateModel();

var latch = new ManualResetEventSlim(false);
_model.ModelShutdown += (model, args) => {
latch.Set();
};

var c = (AutorecoveringConnection)_conn;
c.FrameHandler.Close();

_conn.Dispose();
Wait(latch, TimeSpan.FromSeconds(3));
}

[Fact]
public void TestShutdownSignalPropagationToChannels()
{
Expand Down