From 8b43645cee2d890a99aadd594e14c9122affeec3 Mon Sep 17 00:00:00 2001 From: bollhals Date: Wed, 26 Jun 2024 23:21:30 +0200 Subject: [PATCH 1/2] make IncomingCommand a class and simplify code around it --- .../RabbitMQ.Client/client/RentedMemory.cs | 28 ++++----- .../RabbitMQ.Client/client/framing/Channel.cs | 10 ++-- .../client/impl/ChannelBase.cs | 30 +++++----- .../client/impl/CommandAssembler.cs | 52 +++++++---------- .../client/impl/IncomingCommand.cs | 57 +++++-------------- .../RabbitMQ.Client/client/impl/Session.cs | 5 +- 6 files changed, 65 insertions(+), 117 deletions(-) diff --git a/projects/RabbitMQ.Client/client/RentedMemory.cs b/projects/RabbitMQ.Client/client/RentedMemory.cs index aa2d227be8..37efc44a4c 100644 --- a/projects/RabbitMQ.Client/client/RentedMemory.cs +++ b/projects/RabbitMQ.Client/client/RentedMemory.cs @@ -36,8 +36,6 @@ namespace RabbitMQ.Client { internal struct RentedMemory : IDisposable { - private bool _disposedValue; - internal RentedMemory(byte[] rentedArray) : this(new ReadOnlyMemory(rentedArray), rentedArray) { @@ -47,21 +45,20 @@ internal RentedMemory(ReadOnlyMemory memory, byte[] rentedArray) { Memory = memory; RentedArray = rentedArray; - _disposedValue = false; } - internal readonly ReadOnlyMemory Memory; - - internal readonly byte[] ToArray() - { - return Memory.ToArray(); - } + internal ReadOnlyMemory Memory; internal readonly int Size => Memory.Length; internal readonly ReadOnlySpan Span => Memory.Span; - internal readonly byte[] RentedArray; + internal byte[] RentedArray; + + internal readonly byte[] ToArray() + { + return Memory.ToArray(); + } internal readonly ReadOnlyMemory CopyToMemory() { @@ -70,14 +67,11 @@ internal readonly ReadOnlyMemory CopyToMemory() public void Dispose() { - if (!_disposedValue) + if (RentedArray != null) { - if (RentedArray != null) - { - ArrayPool.Shared.Return(RentedArray); - } - - _disposedValue = true; + ArrayPool.Shared.Return(RentedArray); + RentedArray = default; + Memory = default; } } } diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index c6291eb503..7370f477c1 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -88,17 +88,17 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.BasicAck: { - HandleBasicAck(in cmd); + HandleBasicAck(cmd); return Task.FromResult(true); } case ProtocolCommandId.BasicNack: { - HandleBasicNack(in cmd); + HandleBasicNack(cmd); return Task.FromResult(true); } case ProtocolCommandId.BasicReturn: { - HandleBasicReturn(in cmd); + HandleBasicReturn(cmd); return Task.FromResult(true); } case ProtocolCommandId.ChannelClose: @@ -118,7 +118,7 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.ConnectionBlocked: { - HandleConnectionBlocked(in cmd); + HandleConnectionBlocked(cmd); return Task.FromResult(true); } case ProtocolCommandId.ConnectionClose: @@ -143,7 +143,7 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.ConnectionUnblocked: { - HandleConnectionUnblocked(in cmd); + HandleConnectionUnblocked(cmd); return Task.FromResult(true); } default: diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index db66cce057..98297b4fe1 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -54,7 +54,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable internal TaskCompletionSource m_connectionStartCell; private Exception m_connectionStartException = null; - // AMQP only allows one RPC operation to be active at a time. + // AMQP only allows one RPC operation to be active at a time. protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1); private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true); @@ -425,8 +425,8 @@ private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken can /* * If DispatchCommandAsync returns `true`, it means that the incoming command is server-originated, and has * already been handled. - * - * Else, the incoming command is the return of an RPC call, and must be handled. + * + * Else, the incoming command is the return of an RPC call, and must be handled. */ if (false == await DispatchCommandAsync(cmd, cancellationToken) .ConfigureAwait(false)) @@ -561,7 +561,7 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart return ModelSendAsync(method, cancellationToken).AsTask(); } - protected void HandleBasicAck(in IncomingCommand cmd) + protected void HandleBasicAck(IncomingCommand cmd) { try { @@ -580,7 +580,7 @@ protected void HandleBasicAck(in IncomingCommand cmd) } } - protected void HandleBasicNack(in IncomingCommand cmd) + protected void HandleBasicNack(IncomingCommand cmd) { try { @@ -679,17 +679,17 @@ await ConsumerDispatcher.HandleBasicDeliverAsync( method._exchange, method._routingKey, header, - cmd.Body, + /* + * Takeover Body so it doesn't get returned as it is necessary + * for handling the Basic.Deliver method by client code. + */ + cmd.TakeoverBody(), cancellationToken).ConfigureAwait(false); return true; } finally { - /* - * Note: do not return the Body as it is necessary for handling - * the Basic.Deliver method by client code - */ - cmd.ReturnMethodAndHeaderBuffers(); + cmd.ReturnBuffers(); } } @@ -698,7 +698,7 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) return deliveryTag; } - protected void HandleBasicReturn(in IncomingCommand cmd) + protected void HandleBasicReturn(IncomingCommand cmd) { try { @@ -800,7 +800,7 @@ await ModelSendAsync(method, cancellationToken). } } - protected void HandleConnectionBlocked(in IncomingCommand cmd) + protected void HandleConnectionBlocked(IncomingCommand cmd) { try { @@ -851,7 +851,7 @@ await ModelSendAsync(replyMethod, cancellationToken) protected async Task HandleConnectionSecureAsync(IncomingCommand _) { var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); - await k.HandleCommandAsync(IncomingCommand.Empty) + await k.HandleCommandAsync(new IncomingCommand()) .ConfigureAwait(false); // release the continuation. return true; } @@ -903,7 +903,7 @@ await k.HandleCommandAsync(cmd) return true; } - protected void HandleConnectionUnblocked(in IncomingCommand cmd) + protected void HandleConnectionUnblocked(IncomingCommand cmd) { try { diff --git a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs index 5c09bf5319..5dbc762514 100644 --- a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs +++ b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs @@ -44,34 +44,19 @@ internal sealed class CommandAssembler { private const int MaxArrayOfBytesSize = 2_147_483_591; - private ProtocolCommandId _commandId; - private RentedMemory _methodMemory; - private RentedMemory _headerMemory; - private RentedMemory _bodyMemory; + private readonly IncomingCommand _currentCommand; + private readonly uint _maxBodyLength; + private int _remainingBodyByteCount; - private int _offset; private AssemblyState _state; - private readonly uint _maxBodyLength; - public CommandAssembler(uint maxBodyLength) { + _currentCommand = new IncomingCommand(); _maxBodyLength = maxBodyLength; - Reset(); - } - - private void Reset() - { - _commandId = default; - _methodMemory = default; - _headerMemory = default; - _bodyMemory = default; - _remainingBodyByteCount = 0; - _offset = 0; - _state = AssemblyState.ExpectingMethod; } - public void HandleFrame(InboundFrame frame, out IncomingCommand command) + public IncomingCommand? HandleFrame(InboundFrame frame) { switch (_state) { @@ -88,13 +73,14 @@ public void HandleFrame(InboundFrame frame, out IncomingCommand command) if (_state != AssemblyState.Complete) { - command = IncomingCommand.Empty; - return; + return default; } RabbitMqClientEventSource.Log.CommandReceived(); - command = new IncomingCommand(_commandId, _methodMemory, _headerMemory, _bodyMemory); - Reset(); + _remainingBodyByteCount = 0; + _state = AssemblyState.ExpectingMethod; + + return _currentCommand; } private void ParseMethodFrame(InboundFrame frame) @@ -104,10 +90,10 @@ private void ParseMethodFrame(InboundFrame frame) throw new UnexpectedFrameException(frame.Type); } - _commandId = (ProtocolCommandId)NetworkOrderDeserializer.ReadUInt32(frame.Payload.Span); - _methodMemory = frame.TakeoverPayload(Framing.Method.ArgumentsOffset); + _currentCommand.CommandId = (ProtocolCommandId)NetworkOrderDeserializer.ReadUInt32(frame.Payload.Span); + _currentCommand.Method = frame.TakeoverPayload(Framing.Method.ArgumentsOffset); - switch (_commandId) + switch (_currentCommand.CommandId) { // Commands with payload case ProtocolCommandId.BasicGetOk: @@ -154,7 +140,7 @@ private void ParseHeaderFrame(InboundFrame frame) } else { - _headerMemory = frame.TakeoverPayload(Framing.Header.HeaderArgumentOffset); + _currentCommand.Header = frame.TakeoverPayload(Framing.Header.HeaderArgumentOffset); } _remainingBodyByteCount = (int)totalBodyBytes; @@ -174,25 +160,25 @@ private void ParseBodyFrame(InboundFrame frame) throw new MalformedFrameException($"Overlong content body received - {_remainingBodyByteCount} bytes remaining, {payloadLength} bytes received"); } - if (_bodyMemory.RentedArray is null) + if (_currentCommand.Body.RentedArray is null) { // check for single frame payload for an early exit if (payloadLength == _remainingBodyByteCount) { - _bodyMemory = frame.TakeoverPayload(0); + _currentCommand.Body = frame.TakeoverPayload(0); _state = AssemblyState.Complete; return; } // Is returned by IncomingCommand.ReturnPayload in Session.HandleFrame var rentedBodyArray = ArrayPool.Shared.Rent(_remainingBodyByteCount); - _bodyMemory = new RentedMemory(new ReadOnlyMemory(rentedBodyArray, 0, _remainingBodyByteCount), rentedBodyArray); + _currentCommand.Body.RentedArray = rentedBodyArray; + _currentCommand.Body.Memory = new ReadOnlyMemory(rentedBodyArray, 0, _remainingBodyByteCount); } - frame.Payload.Span.CopyTo(_bodyMemory.RentedArray.AsSpan(_offset)); + frame.Payload.Span.CopyTo(_currentCommand.Body.RentedArray.AsSpan(_currentCommand.Body.Memory.Length - _remainingBodyByteCount)); frame.TryReturnPayload(); _remainingBodyByteCount -= payloadLength; - _offset += payloadLength; UpdateContentBodyState(); } diff --git a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs index fdfb2534af..128c9a6d34 100644 --- a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs +++ b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs @@ -34,60 +34,29 @@ namespace RabbitMQ.Client.Impl { - internal readonly struct IncomingCommand + internal sealed class IncomingCommand { - public static readonly IncomingCommand Empty = default; + public ProtocolCommandId CommandId; - public readonly ProtocolCommandId CommandId; + public RentedMemory Method; + public RentedMemory Header; + public RentedMemory Body; - public readonly RentedMemory Method; - public readonly RentedMemory Header; - public readonly RentedMemory Body; + public ReadOnlySpan MethodSpan => Method.Memory.Span; + public ReadOnlySpan HeaderSpan => Header.Memory.Span; + public ReadOnlySpan BodySpan => Body.Memory.Span; - public readonly bool IsEmpty => CommandId is default(ProtocolCommandId); - - public IncomingCommand(ProtocolCommandId commandId, - RentedMemory method, RentedMemory header, RentedMemory body) - { - CommandId = commandId; - Method = method; - Header = header; - Body = body; - } - - public ReadOnlySpan MethodSpan + public RentedMemory TakeoverBody() { - get - { - return Method.Memory.Span; - } + var body = Body; + Body = default; + return body; } - public ReadOnlySpan HeaderSpan - { - get - { - return Header.Memory.Span; - } - } - - public ReadOnlySpan BodySpan - { - get - { - return Body.Memory.Span; - } - } - - public void ReturnMethodAndHeaderBuffers() + public void ReturnBuffers() { Method.Dispose(); Header.Dispose(); - } - - public void ReturnBuffers() - { - ReturnMethodAndHeaderBuffers(); Body.Dispose(); } } diff --git a/projects/RabbitMQ.Client/client/impl/Session.cs b/projects/RabbitMQ.Client/client/impl/Session.cs index a85d36c1bf..4168b1d556 100644 --- a/projects/RabbitMQ.Client/client/impl/Session.cs +++ b/projects/RabbitMQ.Client/client/impl/Session.cs @@ -48,9 +48,8 @@ public Session(Connection connection, ushort channelNumber, uint maxBodyLength) public override Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken) { - _assembler.HandleFrame(frame, out IncomingCommand cmd); - - if (cmd.IsEmpty) + var cmd = _assembler.HandleFrame(frame); + if (cmd is null) { return Task.CompletedTask; } From 462513d6c5073d9bb2270263a7a9af41944d374e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 8 Jul 2024 07:40:35 -0700 Subject: [PATCH 2/2] Prefer types over `var` --- projects/RabbitMQ.Client/client/impl/IncomingCommand.cs | 2 +- projects/RabbitMQ.Client/client/impl/Session.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs index 128c9a6d34..804c259272 100644 --- a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs +++ b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs @@ -48,7 +48,7 @@ internal sealed class IncomingCommand public RentedMemory TakeoverBody() { - var body = Body; + RentedMemory body = Body; Body = default; return body; } diff --git a/projects/RabbitMQ.Client/client/impl/Session.cs b/projects/RabbitMQ.Client/client/impl/Session.cs index 4168b1d556..29be35c6d9 100644 --- a/projects/RabbitMQ.Client/client/impl/Session.cs +++ b/projects/RabbitMQ.Client/client/impl/Session.cs @@ -48,7 +48,7 @@ public Session(Connection connection, ushort channelNumber, uint maxBodyLength) public override Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken) { - var cmd = _assembler.HandleFrame(frame); + IncomingCommand cmd = _assembler.HandleFrame(frame); if (cmd is null) { return Task.CompletedTask;