Skip to content

Commit

Permalink
Merge pull request #1226 from rabbitmq/rabbitmq-dotnet-client-1225
Browse files Browse the repository at this point in the history
Merge pull request #1224 from rabbitmq/rabbitmq-dotnet-client-1223-6.x
  • Loading branch information
michaelklishin authored Jun 17, 2022
2 parents fd95377 + cf2b303 commit e5e960b
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 27 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
_site/

###################
## Generated files
###################
Expand Down
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
## Changes Between 6.3.1 and 6.4.0

This release adds the ability to specify a maximum message size when receiving data. The default
values are:

* RabbitMQ .NET client 7.0.0 and beyond: 128MiB
* RabbitMQ .NET client 6.4.0 up to 7.0.0: no limit by default

Receiving a frame that specifies a content larger than the limit will throw an execption. This is to
help prevent situations as described in [this discussion](https://github.com/rabbitmq/rabbitmq-dotnet-client/discussions/1213).

To set a limit, use the set `MaxMessageSize` on your `ConnectionFactory` before opening connections:

```
// This sets the limit to 512MiB
var cf = new ConnectionFactory();
cf.MaxMessageSize = 536870912;
var conn = cf.CreateConnection()`
```

GitHub milestone: [`6.4.0`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/58?closed=1)
Diff: [link](https://github.com/rabbitmq/rabbitmq-dotnet-client/compare/v6.3.1...v6.4.0)

## Changes Between 6.3.0 and 6.3.1

GitHub milestone: [`6.3.1`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/57?closed=1)
Expand Down
35 changes: 19 additions & 16 deletions projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,33 @@ public class AmqpTcpEndpoint// : ICloneable
public const int UseDefaultPort = -1;

private int _port;
private uint _maxMessageSize = Constants.DefaultMaxMessageSizeInBytes;

private readonly uint _maxMessageSize;

/// <summary>
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
/// </summary>
/// <param name="hostName">Hostname.</param>
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
/// <param name="ssl">Ssl option.</param>
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl)
/// <param name="maxMessageSize">Maximum message size from RabbitMQ. 0 means "unlimited"</param>
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl, uint maxMessageSize)
{
HostName = hostName;
_port = portOrMinusOne;
Ssl = ssl;
_maxMessageSize = maxMessageSize;
}

/// <summary>
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
/// </summary>
/// <param name="hostName">Hostname.</param>
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
/// <param name="ssl">Ssl option.</param>
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl) :
this(hostName, portOrMinusOne, ssl, ConnectionFactory.DefaultMaxMessageSize)
{
}

/// <summary>
Expand Down Expand Up @@ -119,7 +133,7 @@ public AmqpTcpEndpoint(Uri uri) : this(uri.Host, uri.Port)
/// <returns>A copy with the same hostname, port, and TLS settings</returns>
public object Clone()
{
return new AmqpTcpEndpoint(HostName, _port, Ssl);
return new AmqpTcpEndpoint(HostName, _port, Ssl, _maxMessageSize);
}

/// <summary>
Expand All @@ -129,7 +143,7 @@ public object Clone()
/// <returns>A copy with the provided hostname and port/TLS settings of this endpoint</returns>
public AmqpTcpEndpoint CloneWithHostname(string hostname)
{
return new AmqpTcpEndpoint(hostname, _port, Ssl);
return new AmqpTcpEndpoint(hostname, _port, Ssl, _maxMessageSize);
}

/// <summary>
Expand Down Expand Up @@ -179,22 +193,11 @@ public IProtocol Protocol
public SslOption Ssl { get; set; }

/// <summary>
/// Set the maximum size for a message in bytes. Setting it to 0 reverts to the default of 128MiB
/// Get the maximum size for a message in bytes. The default value is 128MiB to match RabbitMQ's default
/// </summary>
public uint MaxMessageSize
{
get { return _maxMessageSize; }
set
{
if (value == default(uint))
{
_maxMessageSize = Constants.DefaultMaxMessageSizeInBytes;
}
else
{
_maxMessageSize = value;
}
}
}

/// <summary>
Expand Down
17 changes: 16 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace RabbitMQ.Client
/// factory.VirtualHost = ConnectionFactory.DefaultVHost;
/// factory.HostName = hostName;
/// factory.Port = AmqpTcpEndpoint.UseDefaultPort;
/// factory.MaxMessageSize = 512 * 1024 * 1024;
/// //
/// IConnection conn = factory.CreateConnection();
/// //
Expand Down Expand Up @@ -103,6 +104,13 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
public const uint DefaultFrameMax = 0;

/// <summary>
/// Default value for the maximum allowed message size, in bytes, from RabbitMQ.
/// Corresponds to the <code>rabbit.max_message_size</code> setting.
/// Note: the default is 0 which means "unlimited".
/// </summary>
public const uint DefaultMaxMessageSize = 134217728;

/// <summary>
/// Default value for desired heartbeat interval. Default is 60 seconds,
/// TimeSpan.Zero means "heartbeats are disabled".
Expand Down Expand Up @@ -264,12 +272,13 @@ public ConnectionFactory()
/// </summary>
public AmqpTcpEndpoint Endpoint
{
get { return new AmqpTcpEndpoint(HostName, Port, Ssl); }
get { return new AmqpTcpEndpoint(HostName, Port, Ssl, MaxMessageSize); }
set
{
Port = value.Port;
HostName = value.HostName;
Ssl = value.Ssl;
MaxMessageSize = value.MaxMessageSize;
}
}

Expand Down Expand Up @@ -317,6 +326,12 @@ public AmqpTcpEndpoint Endpoint
/// </summary>
public string VirtualHost { get; set; } = DefaultVHost;

/// <summary>
/// Maximum allowed message size, in bytes, from RabbitMQ.
/// Corresponds to the <code>rabbit.max_message_size</code> setting.
/// </summary>
public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize;

/// <summary>
/// The uri to use for the connection.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,20 @@ namespace RabbitMQ.Client.Exceptions
///requiring a connection.close.</summary>
public abstract class HardProtocolException : ProtocolException
{
protected readonly bool _canShutdownCleanly = true;

protected HardProtocolException(string message) : base(message)
{
}

protected HardProtocolException(string message, bool canShutdownCleanly) : base(message)
{
_canShutdownCleanly = canShutdownCleanly;
}

public bool CanShutdownCleanly
{
get { return _canShutdownCleanly; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public MalformedFrameException(string message) : base(message)
{
}

public MalformedFrameException(string message, bool canShutdownCleanly) :
base(message, canShutdownCleanly)
{
}

public override ushort ReplyCode
{
get { return Constants.FrameError; }
Expand Down
3 changes: 0 additions & 3 deletions projects/RabbitMQ.Client/client/framing/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,5 @@ public static class Constants
public const int NotImplemented = 540;
///<summary>(= 541)</summary>
public const int InternalError = 541;

///<summary>(= 134217728)</summary>
public const uint DefaultMaxMessageSizeInBytes = 134217728;
}
}
5 changes: 4 additions & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
{
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
ClosingLoop();
if (hpe.CanShutdownCleanly)
{
ClosingLoop();
}
}
catch (IOException ioe)
{
Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,10 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, u
var frameHeaderSpan = new ReadOnlySpan<byte>(frameHeaderBuffer, 1, 6);
int channel = NetworkOrderDeserializer.ReadUInt16(frameHeaderSpan);
int payloadSize = NetworkOrderDeserializer.ReadInt32(frameHeaderSpan.Slice(2, 4));
if (payloadSize > maxMessageSize)
if ((maxMessageSize > 0) && (payloadSize > maxMessageSize))
{
throw new MalformedFrameException($"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes");
string msg = $"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes";
throw new MalformedFrameException(message: msg, canShutdownCleanly: false);
}

const int EndMarkerLength = 1;
Expand Down
10 changes: 8 additions & 2 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ namespace RabbitMQ.Client
public AmqpTcpEndpoint(string hostName, int portOrMinusOne = -1) { }
public AmqpTcpEndpoint(System.Uri uri, RabbitMQ.Client.SslOption ssl) { }
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl) { }
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl, uint maxMessageSize) { }
public System.Net.Sockets.AddressFamily AddressFamily { get; set; }
public string HostName { get; set; }
public uint MaxMessageSize { get; set; }
public uint MaxMessageSize { get; }
public int Port { get; set; }
public RabbitMQ.Client.IProtocol Protocol { get; }
public RabbitMQ.Client.SslOption Ssl { get; set; }
Expand Down Expand Up @@ -151,6 +152,7 @@ namespace RabbitMQ.Client
{
public const ushort DefaultChannelMax = 2047;
public const uint DefaultFrameMax = 0u;
public const uint DefaultMaxMessageSize = 134217728u;
public const string DefaultPass = "guest";
public const string DefaultUser = "guest";
public const string DefaultVHost = "/";
Expand All @@ -170,6 +172,7 @@ namespace RabbitMQ.Client
public System.Func<System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint>, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; }
public System.TimeSpan HandshakeContinuationTimeout { get; set; }
public string HostName { get; set; }
public uint MaxMessageSize { get; set; }
public System.TimeSpan NetworkRecoveryInterval { get; set; }
public string Password { get; set; }
public int Port { get; set; }
Expand Down Expand Up @@ -208,7 +211,6 @@ namespace RabbitMQ.Client
public const int CommandInvalid = 503;
public const int ConnectionForced = 320;
public const int ContentTooLarge = 311;
public const uint DefaultMaxMessageSizeInBytes = 134217728u;
public const int FrameBody = 3;
public const int FrameEnd = 206;
public const int FrameError = 501;
Expand Down Expand Up @@ -825,11 +827,15 @@ namespace RabbitMQ.Client.Exceptions
}
public abstract class HardProtocolException : RabbitMQ.Client.Exceptions.ProtocolException
{
protected readonly bool _canShutdownCleanly;
protected HardProtocolException(string message) { }
protected HardProtocolException(string message, bool canShutdownCleanly) { }
public bool CanShutdownCleanly { get; }
}
public class MalformedFrameException : RabbitMQ.Client.Exceptions.HardProtocolException
{
public MalformedFrameException(string message) { }
public MalformedFrameException(string message, bool canShutdownCleanly) { }
public override ushort ReplyCode { get; }
}
[System.Serializable]
Expand Down
93 changes: 92 additions & 1 deletion projects/Unit/TestBasicPublish.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Framing;
using Xunit;
using Xunit.Sdk;

namespace RabbitMQ.Client.Unit
{
Expand Down Expand Up @@ -130,5 +131,95 @@ public void CanNotModifyPayloadAfterPublish()
m.BasicCancel(tag);
}
}

[Fact]
public void TestMaxMessageSize()
{
var re = new ManualResetEventSlim();
const ushort maxMsgSize = 1024;

int count = 0;
byte[] msg0 = Encoding.UTF8.GetBytes("hi");

var r = new System.Random();
byte[] msg1 = new byte[maxMsgSize * 2];
r.NextBytes(msg1);

var cf = new ConnectionFactory();
cf.AutomaticRecoveryEnabled = false;
cf.TopologyRecoveryEnabled = false;
cf.MaxMessageSize = maxMsgSize;

bool sawConnectionShutdown = false;
bool sawModelShutdown = false;
bool sawConsumerRegistered = false;
bool sawConsumerCancelled = false;

using (IConnection c = cf.CreateConnection())
{
c.ConnectionShutdown += (o, a) =>
{
sawConnectionShutdown = true;
};

Assert.Equal(maxMsgSize, cf.MaxMessageSize);
Assert.Equal(maxMsgSize, cf.Endpoint.MaxMessageSize);
Assert.Equal(maxMsgSize, c.Endpoint.MaxMessageSize);

using (IModel m = c.CreateModel())
{
m.ModelShutdown += (o, a) =>
{
sawModelShutdown = true;
};

m.CallbackException += (o, a) =>
{
throw new XunitException("Unexpected m.CallbackException");
};

QueueDeclareOk q = m.QueueDeclare();

var consumer = new EventingBasicConsumer(m);

consumer.Shutdown += (o, a) =>
{
re.Set();
};

consumer.Registered += (o, a) =>
{
sawConsumerRegistered = true;
};

consumer.Unregistered += (o, a) =>
{
throw new XunitException("Unexpected consumer.Unregistered");
};

consumer.ConsumerCancelled += (o, a) =>
{
sawConsumerCancelled = true;
};

consumer.Received += (o, a) =>
{
Interlocked.Increment(ref count);
};

string tag = m.BasicConsume(q.QueueName, true, consumer);

m.BasicPublish("", q.QueueName, msg0);
m.BasicPublish("", q.QueueName, msg1);
Assert.True(re.Wait(TimeSpan.FromSeconds(5)));

Assert.Equal(1, count);
Assert.True(sawConnectionShutdown);
Assert.True(sawModelShutdown);
Assert.True(sawConsumerRegistered);
Assert.True(sawConsumerCancelled);
}
}
}
}
}
Loading

0 comments on commit e5e960b

Please sign in to comment.