Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge pull request #1224 from rabbitmq/rabbitmq-dotnet-client-1223-6.x #1226

Merged
merged 3 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
_site/

###################
## Generated files
###################
Expand Down
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
## Changes Between 6.3.1 and 6.4.0

This release adds the ability to specify a maximum message size when receiving data. The default
values are:

* RabbitMQ .NET client 7.0.0 and beyond: 128MiB
* RabbitMQ .NET client 6.4.0 up to 7.0.0: no limit by default

Receiving a frame that specifies a content larger than the limit will throw an execption. This is to
help prevent situations as described in [this discussion](https://github.com/rabbitmq/rabbitmq-dotnet-client/discussions/1213).

To set a limit, use the set `MaxMessageSize` on your `ConnectionFactory` before opening connections:

```
// This sets the limit to 512MiB
var cf = new ConnectionFactory();
cf.MaxMessageSize = 536870912;
var conn = cf.CreateConnection()`
```

GitHub milestone: [`6.4.0`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/58?closed=1)
Diff: [link](https://github.com/rabbitmq/rabbitmq-dotnet-client/compare/v6.3.1...v6.4.0)

## Changes Between 6.3.0 and 6.3.1

GitHub milestone: [`6.3.1`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/57?closed=1)
Expand Down
35 changes: 19 additions & 16 deletions projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,33 @@ public class AmqpTcpEndpoint// : ICloneable
public const int UseDefaultPort = -1;

private int _port;
private uint _maxMessageSize = Constants.DefaultMaxMessageSizeInBytes;

private readonly uint _maxMessageSize;

/// <summary>
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
/// </summary>
/// <param name="hostName">Hostname.</param>
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
/// <param name="ssl">Ssl option.</param>
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl)
/// <param name="maxMessageSize">Maximum message size from RabbitMQ. 0 means "unlimited"</param>
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl, uint maxMessageSize)
{
HostName = hostName;
_port = portOrMinusOne;
Ssl = ssl;
_maxMessageSize = maxMessageSize;
}

/// <summary>
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
/// </summary>
/// <param name="hostName">Hostname.</param>
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
/// <param name="ssl">Ssl option.</param>
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl) :
this(hostName, portOrMinusOne, ssl, ConnectionFactory.DefaultMaxMessageSize)
{
}

/// <summary>
Expand Down Expand Up @@ -119,7 +133,7 @@ public AmqpTcpEndpoint(Uri uri) : this(uri.Host, uri.Port)
/// <returns>A copy with the same hostname, port, and TLS settings</returns>
public object Clone()
{
return new AmqpTcpEndpoint(HostName, _port, Ssl);
return new AmqpTcpEndpoint(HostName, _port, Ssl, _maxMessageSize);
}

/// <summary>
Expand All @@ -129,7 +143,7 @@ public object Clone()
/// <returns>A copy with the provided hostname and port/TLS settings of this endpoint</returns>
public AmqpTcpEndpoint CloneWithHostname(string hostname)
{
return new AmqpTcpEndpoint(hostname, _port, Ssl);
return new AmqpTcpEndpoint(hostname, _port, Ssl, _maxMessageSize);
}

/// <summary>
Expand Down Expand Up @@ -179,22 +193,11 @@ public IProtocol Protocol
public SslOption Ssl { get; set; }

/// <summary>
/// Set the maximum size for a message in bytes. Setting it to 0 reverts to the default of 128MiB
/// Get the maximum size for a message in bytes. The default value is 128MiB to match RabbitMQ's default
/// </summary>
public uint MaxMessageSize
{
get { return _maxMessageSize; }
set
{
if (value == default(uint))
{
_maxMessageSize = Constants.DefaultMaxMessageSizeInBytes;
}
else
{
_maxMessageSize = value;
}
}
}

/// <summary>
Expand Down
17 changes: 16 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace RabbitMQ.Client
/// factory.VirtualHost = ConnectionFactory.DefaultVHost;
/// factory.HostName = hostName;
/// factory.Port = AmqpTcpEndpoint.UseDefaultPort;
/// factory.MaxMessageSize = 512 * 1024 * 1024;
/// //
/// IConnection conn = factory.CreateConnection();
/// //
Expand Down Expand Up @@ -103,6 +104,13 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
public const uint DefaultFrameMax = 0;

/// <summary>
/// Default value for the maximum allowed message size, in bytes, from RabbitMQ.
/// Corresponds to the <code>rabbit.max_message_size</code> setting.
/// Note: the default is 0 which means "unlimited".
/// </summary>
public const uint DefaultMaxMessageSize = 134217728;

/// <summary>
/// Default value for desired heartbeat interval. Default is 60 seconds,
/// TimeSpan.Zero means "heartbeats are disabled".
Expand Down Expand Up @@ -264,12 +272,13 @@ public ConnectionFactory()
/// </summary>
public AmqpTcpEndpoint Endpoint
{
get { return new AmqpTcpEndpoint(HostName, Port, Ssl); }
get { return new AmqpTcpEndpoint(HostName, Port, Ssl, MaxMessageSize); }
set
{
Port = value.Port;
HostName = value.HostName;
Ssl = value.Ssl;
MaxMessageSize = value.MaxMessageSize;
}
}

Expand Down Expand Up @@ -317,6 +326,12 @@ public AmqpTcpEndpoint Endpoint
/// </summary>
public string VirtualHost { get; set; } = DefaultVHost;

/// <summary>
/// Maximum allowed message size, in bytes, from RabbitMQ.
/// Corresponds to the <code>rabbit.max_message_size</code> setting.
/// </summary>
public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize;

/// <summary>
/// The uri to use for the connection.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,20 @@ namespace RabbitMQ.Client.Exceptions
///requiring a connection.close.</summary>
public abstract class HardProtocolException : ProtocolException
{
protected readonly bool _canShutdownCleanly = true;

protected HardProtocolException(string message) : base(message)
{
}

protected HardProtocolException(string message, bool canShutdownCleanly) : base(message)
{
_canShutdownCleanly = canShutdownCleanly;
}

public bool CanShutdownCleanly
{
get { return _canShutdownCleanly; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public MalformedFrameException(string message) : base(message)
{
}

public MalformedFrameException(string message, bool canShutdownCleanly) :
base(message, canShutdownCleanly)
{
}

public override ushort ReplyCode
{
get { return Constants.FrameError; }
Expand Down
3 changes: 0 additions & 3 deletions projects/RabbitMQ.Client/client/framing/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,5 @@ public static class Constants
public const int NotImplemented = 540;
///<summary>(= 541)</summary>
public const int InternalError = 541;

///<summary>(= 134217728)</summary>
public const uint DefaultMaxMessageSizeInBytes = 134217728;
}
}
5 changes: 4 additions & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
{
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
ClosingLoop();
if (hpe.CanShutdownCleanly)
{
ClosingLoop();
}
}
catch (IOException ioe)
{
Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,10 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, u
var frameHeaderSpan = new ReadOnlySpan<byte>(frameHeaderBuffer, 1, 6);
int channel = NetworkOrderDeserializer.ReadUInt16(frameHeaderSpan);
int payloadSize = NetworkOrderDeserializer.ReadInt32(frameHeaderSpan.Slice(2, 4));
if (payloadSize > maxMessageSize)
if ((maxMessageSize > 0) && (payloadSize > maxMessageSize))
{
throw new MalformedFrameException($"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes");
string msg = $"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes";
throw new MalformedFrameException(message: msg, canShutdownCleanly: false);
}

const int EndMarkerLength = 1;
Expand Down
10 changes: 8 additions & 2 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ namespace RabbitMQ.Client
public AmqpTcpEndpoint(string hostName, int portOrMinusOne = -1) { }
public AmqpTcpEndpoint(System.Uri uri, RabbitMQ.Client.SslOption ssl) { }
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl) { }
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl, uint maxMessageSize) { }
public System.Net.Sockets.AddressFamily AddressFamily { get; set; }
public string HostName { get; set; }
public uint MaxMessageSize { get; set; }
public uint MaxMessageSize { get; }
public int Port { get; set; }
public RabbitMQ.Client.IProtocol Protocol { get; }
public RabbitMQ.Client.SslOption Ssl { get; set; }
Expand Down Expand Up @@ -151,6 +152,7 @@ namespace RabbitMQ.Client
{
public const ushort DefaultChannelMax = 2047;
public const uint DefaultFrameMax = 0u;
public const uint DefaultMaxMessageSize = 134217728u;
public const string DefaultPass = "guest";
public const string DefaultUser = "guest";
public const string DefaultVHost = "/";
Expand All @@ -170,6 +172,7 @@ namespace RabbitMQ.Client
public System.Func<System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint>, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; }
public System.TimeSpan HandshakeContinuationTimeout { get; set; }
public string HostName { get; set; }
public uint MaxMessageSize { get; set; }
public System.TimeSpan NetworkRecoveryInterval { get; set; }
public string Password { get; set; }
public int Port { get; set; }
Expand Down Expand Up @@ -208,7 +211,6 @@ namespace RabbitMQ.Client
public const int CommandInvalid = 503;
public const int ConnectionForced = 320;
public const int ContentTooLarge = 311;
public const uint DefaultMaxMessageSizeInBytes = 134217728u;
public const int FrameBody = 3;
public const int FrameEnd = 206;
public const int FrameError = 501;
Expand Down Expand Up @@ -825,11 +827,15 @@ namespace RabbitMQ.Client.Exceptions
}
public abstract class HardProtocolException : RabbitMQ.Client.Exceptions.ProtocolException
{
protected readonly bool _canShutdownCleanly;
protected HardProtocolException(string message) { }
protected HardProtocolException(string message, bool canShutdownCleanly) { }
public bool CanShutdownCleanly { get; }
}
public class MalformedFrameException : RabbitMQ.Client.Exceptions.HardProtocolException
{
public MalformedFrameException(string message) { }
public MalformedFrameException(string message, bool canShutdownCleanly) { }
public override ushort ReplyCode { get; }
}
[System.Serializable]
Expand Down
93 changes: 92 additions & 1 deletion projects/Unit/TestBasicPublish.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Framing;
using Xunit;
using Xunit.Sdk;

namespace RabbitMQ.Client.Unit
{
Expand Down Expand Up @@ -130,5 +131,95 @@ public void CanNotModifyPayloadAfterPublish()
m.BasicCancel(tag);
}
}

[Fact]
public void TestMaxMessageSize()
{
var re = new ManualResetEventSlim();
const ushort maxMsgSize = 1024;

int count = 0;
byte[] msg0 = Encoding.UTF8.GetBytes("hi");

var r = new System.Random();
byte[] msg1 = new byte[maxMsgSize * 2];
r.NextBytes(msg1);

var cf = new ConnectionFactory();
cf.AutomaticRecoveryEnabled = false;
cf.TopologyRecoveryEnabled = false;
cf.MaxMessageSize = maxMsgSize;

bool sawConnectionShutdown = false;
bool sawModelShutdown = false;
bool sawConsumerRegistered = false;
bool sawConsumerCancelled = false;

using (IConnection c = cf.CreateConnection())
{
c.ConnectionShutdown += (o, a) =>
{
sawConnectionShutdown = true;
};

Assert.Equal(maxMsgSize, cf.MaxMessageSize);
Assert.Equal(maxMsgSize, cf.Endpoint.MaxMessageSize);
Assert.Equal(maxMsgSize, c.Endpoint.MaxMessageSize);

using (IModel m = c.CreateModel())
{
m.ModelShutdown += (o, a) =>
{
sawModelShutdown = true;
};

m.CallbackException += (o, a) =>
{
throw new XunitException("Unexpected m.CallbackException");
};

QueueDeclareOk q = m.QueueDeclare();

var consumer = new EventingBasicConsumer(m);

consumer.Shutdown += (o, a) =>
{
re.Set();
};

consumer.Registered += (o, a) =>
{
sawConsumerRegistered = true;
};

consumer.Unregistered += (o, a) =>
{
throw new XunitException("Unexpected consumer.Unregistered");
};

consumer.ConsumerCancelled += (o, a) =>
{
sawConsumerCancelled = true;
};

consumer.Received += (o, a) =>
{
Interlocked.Increment(ref count);
};

string tag = m.BasicConsume(q.QueueName, true, consumer);

m.BasicPublish("", q.QueueName, msg0);
m.BasicPublish("", q.QueueName, msg1);
Assert.True(re.Wait(TimeSpan.FromSeconds(5)));

Assert.Equal(1, count);
Assert.True(sawConnectionShutdown);
Assert.True(sawModelShutdown);
Assert.True(sawConsumerRegistered);
Assert.True(sawConsumerCancelled);
}
}
}
}
}
Loading