Skip to content

Commit

Permalink
Adding pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefán Jökull Sigurðarson committed Apr 25, 2022
1 parent 9ccf87a commit 51b3084
Show file tree
Hide file tree
Showing 25 changed files with 733 additions and 566 deletions.
171 changes: 171 additions & 0 deletions projects/Benchmarks/ArrayBufferWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// We only need this if we aren't targeting .NET 6.0 or greater since it already exists there
#if !NET6_0_OR_GREATER
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.Buffers
{
public class ArrayBufferWriter<T> : IBufferWriter<T>, IDisposable
{
private T[] _rentedBuffer;
private int _written;
private long _committed;

private const int MinimumBufferSize = 256;

public ArrayBufferWriter(int initialCapacity = MinimumBufferSize)
{
if (initialCapacity <= 0)
{
throw new ArgumentException(null, nameof(initialCapacity));
}

_rentedBuffer = ArrayPool<T>.Shared.Rent(initialCapacity);
_written = 0;
_committed = 0;
}

public Memory<T> WrittenMemory
{
get
{
CheckIfDisposed();

return _rentedBuffer.AsMemory(0, _written);
}
}

public Span<T> WrittenSpan
{
get
{
CheckIfDisposed();

return _rentedBuffer.AsSpan(0, _written);
}
}

public int BytesWritten
{
get
{
CheckIfDisposed();

return _written;
}
}

public long BytesCommitted
{
get
{
CheckIfDisposed();

return _committed;
}
}

public void Clear()
{
CheckIfDisposed();

ClearHelper();
}

private void ClearHelper()
{
_rentedBuffer.AsSpan(0, _written).Clear();
_written = 0;
}

public void Advance(int count)
{
CheckIfDisposed();

if (count < 0)
throw new ArgumentException(nameof(count));

if (_written > _rentedBuffer.Length - count)
throw new InvalidOperationException("Cannot advance past the end of the buffer.");

_written += count;
}

// Returns the rented buffer back to the pool
public void Dispose()
{
if (_rentedBuffer == null)
{
return;
}

ArrayPool<T>.Shared.Return(_rentedBuffer, clearArray: true);
_rentedBuffer = null;
_written = 0;
}

private void CheckIfDisposed()
{
if (_rentedBuffer == null)
throw new ObjectDisposedException(nameof(ArrayBufferWriter<T>));
}

public Memory<T> GetMemory(int sizeHint = 0)
{
CheckIfDisposed();

if (sizeHint < 0)
throw new ArgumentException(nameof(sizeHint));

CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsMemory(_written);
}

public Span<T> GetSpan(int sizeHint = 0)
{
CheckIfDisposed();

if (sizeHint < 0)
throw new ArgumentException(nameof(sizeHint));

CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsSpan(_written);
}

private void CheckAndResizeBuffer(int sizeHint)
{
Debug.Assert(sizeHint >= 0);

if (sizeHint == 0)
{
sizeHint = MinimumBufferSize;
}

int availableSpace = _rentedBuffer.Length - _written;

if (sizeHint > availableSpace)
{
int growBy = sizeHint > _rentedBuffer.Length ? sizeHint : _rentedBuffer.Length;

int newSize = checked(_rentedBuffer.Length + growBy);

T[] oldBuffer = _rentedBuffer;

_rentedBuffer = ArrayPool<T>.Shared.Rent(newSize);

Debug.Assert(oldBuffer.Length >= _written);
Debug.Assert(_rentedBuffer.Length >= _written);

oldBuffer.AsSpan(0, _written).CopyTo(_rentedBuffer);
ArrayPool<T>.Shared.Return(oldBuffer, clearArray: true);
}

Debug.Assert(_rentedBuffer.Length - _written > 0);
Debug.Assert(_rentedBuffer.Length - _written >= sizeHint);
}
}
}
#endif
36 changes: 31 additions & 5 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.Text;

using BenchmarkDotNet.Attributes;
Expand All @@ -19,7 +20,12 @@ public class MethodFramingBasicAck
public ushort Channel { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
public ReadOnlyMemory<byte> BasicAckWrite()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _basicAck, _writer, Channel);
return _writer.WrittenMemory;
}
}

[Config(typeof(Config))]
Expand All @@ -41,13 +47,28 @@ public class MethodFramingBasicPublish
public int FrameMax { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, _writer, Channel, FrameMax);
return _writer.WrittenMemory;
}

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishWrite()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, _writer, Channel, FrameMax);
return _writer.WrittenMemory;
}

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishMemoryWrite()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, _writer, Channel, FrameMax);
return _writer.WrittenMemory;
}
}

[Config(typeof(Config))]
Expand All @@ -60,6 +81,11 @@ public class MethodFramingChannelClose
public ushort Channel { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
public ReadOnlyMemory<byte> ChannelCloseWrite()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _channelClose, _writer, Channel);
return _writer.WrittenMemory;
}
}
}
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@
<PackageReference Include="MinVer" Version="3.1.0" PrivateAssets="All" />
<PackageReference Include="System.Memory" Version="4.5.4" />
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.2.2" />
</ItemGroup>

</Project>
</Project>
11 changes: 5 additions & 6 deletions projects/RabbitMQ.Client/client/api/ConnectionFactoryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

using System;
using System.Net.Sockets;

using Pipelines.Sockets.Unofficial;

using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
Expand All @@ -49,12 +52,8 @@ public class ConnectionFactoryBase
/// <returns>New instance of a <see cref="TcpClient"/>.</returns>
public static ITcpClient DefaultSocketFactory(AddressFamily addressFamily)
{
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
{
NoDelay = true,
ReceiveBufferSize = 65536,
SendBufferSize = 65536
};
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
SocketConnection.SetRecommendedClientOptions(socket);
return new TcpClientAdapter(socket);
}
}
Expand Down
Loading

0 comments on commit 51b3084

Please sign in to comment.