From 27ccaed9a51622a9eaa70a6711306ca1d7664688 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 2 Dec 2021 00:34:57 +0700 Subject: [PATCH 1/9] Fix UDP memory leak --- .../IO/UdpConnectedIntegrationSpec.cs | 84 ++++++++++++++++++- src/core/Akka.Tests/IO/UdpIntegrationSpec.cs | 57 ++++++++++++- src/core/Akka/Actor/BuiltInActors.cs | 10 +-- src/core/Akka/Actor/EmptyLocalActorRef.cs | 75 +++++++++-------- src/core/Akka/IO/Buffers/DirectBufferPool.cs | 37 +++++++- .../Akka/IO/Buffers/DisabledBufferPool.cs | 5 +- src/core/Akka/IO/SocketEventArgsPool.cs | 71 +++++++++++----- src/core/Akka/IO/Udp.cs | 56 +++++++------ src/core/Akka/IO/UdpConnected.cs | 25 +++--- src/core/Akka/IO/UdpConnectedManager.cs | 17 ---- src/core/Akka/IO/UdpConnection.cs | 58 +++++++------ src/core/Akka/IO/UdpListener.cs | 48 +++-------- src/core/Akka/IO/WithUdpSend.cs | 5 +- src/core/Akka/Util/ByteString.cs | 5 ++ 14 files changed, 361 insertions(+), 192 deletions(-) diff --git a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs index 3f79b8a8d9c..54ad93e90f0 100644 --- a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs @@ -8,8 +8,10 @@ using System; using System.Linq; using System.Net; +using System.Threading; using Akka.Actor; using Akka.IO; +using Akka.IO.Buffers; using Akka.TestKit; using FluentAssertions; using FluentAssertions.Extensions; @@ -26,12 +28,20 @@ public UdpConnectedIntegrationSpec(ITestOutputHelper output) : base(@" akka.actor.serialize-creators = on akka.actor.serialize-messages = on + + akka.io.udp-connected.buffer-pool = ""akka.io.udp-connected.direct-buffer-pool"" akka.io.udp-connected.nr-of-selectors = 1 - akka.io.udp-connected.direct-buffer-pool-limit = 100 - akka.io.udp-connected.direct-buffer-size = 1024 + # This comes out to be about 1.6 Mib maximum total buffer size + akka.io.udp-connected.direct-buffer-pool.buffer-size = 512 + akka.io.udp-connected.direct-buffer-pool.buffers-per-segment = 32 + akka.io.udp-connected.direct-buffer-pool.buffer-pool-limit = 100 + + akka.io.udp.buffer-pool = ""akka.io.udp.direct-buffer-pool"" akka.io.udp.nr-of-selectors = 1 - akka.io.udp.direct-buffer-pool-limit = 100 - akka.io.udp.direct-buffer-size = 1024 + # This comes out to be about 1.6 Mib maximum total buffer size + akka.io.udp.direct-buffer-pool.buffer-size = 512 + akka.io.udp.direct-buffer-pool.buffers-per-segment = 32 + akka.io.udp.direct-buffer-pool.buffer-pool-limit = 100 akka.io.udp.trace-logging = true akka.loglevel = DEBUG", output) { @@ -141,5 +151,71 @@ public void The_UDP_connection_oriented_implementation_must_to_send_batch_writes msgs = raw.Cast(); msgs.Sum(x => x.Data.Count).Should().Be(data.Count * 3); } + + [Fact] + public void The_UDP_connection_oriented_implementation_must_not_leak_memory() + { + const int batchSize = 5000; + + var serverAddress = _addresses[0]; + var clientAddress = _addresses[1]; + var udpConnection = UdpConnected.Instance.Apply(Sys); + + var poolInfo = udpConnection.SocketEventArgsPool.BufferPoolInfo; + poolInfo.Type.Should().Be(typeof(DirectBufferPool)); + poolInfo.Free.Should().Be(poolInfo.TotalSize); + poolInfo.Used.Should().Be(0); + + var server = CreateTestProbe(); + udpConnection.Manager.Tell(new UdpConnected.Connect(server, clientAddress, serverAddress), server); + server.ExpectMsg(); + var serverEp = server.LastSender; + + var client = CreateTestProbe(); + udpConnection.Manager.Tell(new UdpConnected.Connect(client, serverAddress, clientAddress), client); + client.ExpectMsg(); + var clientEp = client.LastSender; + + var data = ByteString.FromString("Fly little packet!"); + + // send a lot of packets through, the byte buffer pool should not leak anything + for (var j = 0; j < batchSize; ++j) + { + serverEp.Tell(UdpConnected.Send.Create(data)); + } + + var msgs = client.ReceiveN(batchSize, TimeSpan.FromSeconds(10)); + var cast = msgs.Cast(); + cast.Sum(m => m.Data.Count).Should().Be(data.Count * batchSize); + + // stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected + serverEp.Tell(UdpConnected.Disconnect.Instance, server); + server.ExpectMsg(); + clientEp.Tell(UdpConnected.Disconnect.Instance, client); + client.ExpectMsg(); + + // wait for all SocketAsyncEventArgs to be released + Thread.Sleep(1000); + + poolInfo = udpConnection.SocketEventArgsPool.BufferPoolInfo; + poolInfo.Type.Should().Be(typeof(DirectBufferPool)); + poolInfo.Free.Should().Be(poolInfo.TotalSize); + poolInfo.Used.Should().Be(0); + } + + internal class UdpClientHandler: ReceiveActor + { + public int ReceivedMessages; + public int TotalData; + public UdpClientHandler() + { + Receive(rcv => + { + ReceivedMessages++; + TotalData += rcv.Data.Count; + }); + ReceiveAny(obj => throw new Exception("Invalid message")); + } + } } } diff --git a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs index 4cea5cd7d79..795cb42c792 100644 --- a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs @@ -9,8 +9,10 @@ using System.Linq; using System.Net; using System.Net.Sockets; +using System.Threading; using Akka.Actor; using Akka.IO; +using Akka.IO.Buffers; using Akka.TestKit; using Akka.Util.Internal; using Xunit; @@ -31,8 +33,15 @@ public UdpIntegrationSpec(ITestOutputHelper output) akka.actor.serialize-messages = on akka.io.udp.max-channels = unlimited akka.io.udp.nr-of-selectors = 1 - akka.io.udp.direct-buffer-pool-limit = 100 - akka.io.udp.direct-buffer-size = 1024", output) + + akka.io.udp.buffer-pool = ""akka.io.udp.direct-buffer-pool"" + akka.io.udp.nr-of-selectors = 1 + # This comes out to be about 1.6 Mib maximum total buffer size + akka.io.udp.direct-buffer-pool.buffer-size = 512 + akka.io.udp.direct-buffer-pool.buffers-per-segment = 32 + akka.io.udp.direct-buffer-pool.buffer-pool-limit = 100 + # akka.io.udp.trace-logging = true + akka.loglevel = DEBUG", output) { _addresses = TestUtils.TemporaryServerAddresses(6, udp: true).ToArray(); } @@ -194,6 +203,50 @@ void CheckSendingToServer(ByteString expected) for (int i = 0; i < iterations; i++) CheckSendingToClient(data[i]); } + [Fact] + public void The_UDP_Fire_and_Forget_implementation_must_not_leak_memory() + { + const int batchSize = 5000; + + var serverAddress = _addresses[0]; + var clientAddress = _addresses[1]; + + var udp = Udp.Instance.Apply(Sys); + var poolInfo = udp.SocketEventArgsPool.BufferPoolInfo; + poolInfo.Type.Should().Be(typeof(DirectBufferPool)); + poolInfo.Free.Should().Be(poolInfo.TotalSize); + poolInfo.Used.Should().Be(0); + + var serverProbe = CreateTestProbe(); + var server = BindUdp(serverAddress, serverProbe); + var clientProbe = CreateTestProbe(); + var client = BindUdp(clientAddress, clientProbe); + + var data = ByteString.FromString("Fly little packet!"); + + // send a lot of packets through, the byte buffer pool should not leak anything + for (int i = 0; i < batchSize; i++) + server.Tell(Udp.Send.Create(data, clientAddress)); + + var msgs = clientProbe.ReceiveN(batchSize); + var receives = msgs.Cast(); + receives.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize); + + // stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected + server.Tell(Udp.Unbind.Instance, serverProbe); + serverProbe.ExpectMsg(); + client.Tell(Udp.Unbind.Instance, clientProbe); + clientProbe.ExpectMsg(); + + // wait for all SocketAsyncEventArgs to be released + Thread.Sleep(1000); + + poolInfo = udp.SocketEventArgsPool.BufferPoolInfo; + poolInfo.Type.Should().Be(typeof(DirectBufferPool)); + poolInfo.Free.Should().Be(poolInfo.TotalSize); + poolInfo.Used.Should().Be(0); + } + [Fact] public void The_UDP_Fire_and_Forget_implementation_must_call_SocketOption_beforeBind_method_before_bind() { diff --git a/src/core/Akka/Actor/BuiltInActors.cs b/src/core/Akka/Actor/BuiltInActors.cs index a2aa954b9c4..e73d266ecdf 100644 --- a/src/core/Akka/Actor/BuiltInActors.cs +++ b/src/core/Akka/Actor/BuiltInActors.cs @@ -225,14 +225,13 @@ public DeadLetterActorRef(IActorRefProvider provider, ActorPath path, EventStrea protected override void TellInternal(object message, IActorRef sender) { if (message == null) throw new InvalidMessageException("Message is null"); - var i = message as Identify; - if (i != null) + if (message is Identify i) { sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody)); return; } - var d = message as DeadLetter; - if (d != null) + + if (message is DeadLetter d) { if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); } return; @@ -248,8 +247,7 @@ protected override void TellInternal(object message, IActorRef sender) /// TBD protected override bool SpecialHandle(object message, IActorRef sender) { - var w = message as Watch; - if (w != null) + if (message is Watch w) { if (!w.Watchee.Equals(this) && !w.Watcher.Equals(this)) { diff --git a/src/core/Akka/Actor/EmptyLocalActorRef.cs b/src/core/Akka/Actor/EmptyLocalActorRef.cs index 2c37220d98a..1d9ae7b7c36 100644 --- a/src/core/Akka/Actor/EmptyLocalActorRef.cs +++ b/src/core/Akka/Actor/EmptyLocalActorRef.cs @@ -84,51 +84,52 @@ public override void SendSystemMessage(ISystemMessage message) /// TBD protected virtual bool SpecialHandle(object message, IActorRef sender) { - if (message is Watch watch) + switch (message) { - if (watch.Watchee.Equals(this) && !watch.Watcher.Equals(this)) + case Watch watch: { - watch.Watcher.SendSystemMessage(new DeathWatchNotification(watch.Watchee, existenceConfirmed: false, addressTerminated: false)); - } - return true; - } - if (message is Unwatch) - return true; //Just ignore - - if (message is Identify identify) - { - sender.Tell(new ActorIdentity(identify.MessageId, null)); - return true; - } - - if (message is ActorSelectionMessage actorSelectionMessage) - { - if (actorSelectionMessage.Message is Identify selectionIdentify) - { - if (!actorSelectionMessage.WildCardFanOut) - sender.Tell(new ActorIdentity(selectionIdentify.MessageId, null)); - } - else - { - if (actorSelectionMessage.Message is IDeadLetterSuppression selectionDeadLetterSuppression) + if (watch.Watchee.Equals(this) && !watch.Watcher.Equals(this)) { - PublishSupressedDeadLetter(selectionDeadLetterSuppression, sender); + watch.Watcher.SendSystemMessage(new DeathWatchNotification(watch.Watchee, existenceConfirmed: false, addressTerminated: false)); } - else + return true; + } + + case Unwatch _: + return true; //Just ignore + + case Identify identify: + sender.Tell(new ActorIdentity(identify.MessageId, null)); + return true; + + case ActorSelectionMessage actorSelectionMessage: + { + switch (actorSelectionMessage.Message) { - _eventStream.Publish(new DeadLetter(actorSelectionMessage.Message, sender.IsNobody() ? _provider.DeadLetters : sender, this)); + case Identify selectionIdentify: + { + if (!actorSelectionMessage.WildCardFanOut) + sender.Tell(new ActorIdentity(selectionIdentify.MessageId, null)); + break; + } + case IDeadLetterSuppression selectionDeadLetterSuppression: + PublishSupressedDeadLetter(selectionDeadLetterSuppression, sender); + break; + default: + _eventStream.Publish(new DeadLetter(actorSelectionMessage.Message, sender.IsNobody() ? _provider.DeadLetters : sender, this)); + break; } - } - return true; - } - if (message is IDeadLetterSuppression deadLetterSuppression) - { - PublishSupressedDeadLetter(deadLetterSuppression, sender); - return true; + return true; + } + + case IDeadLetterSuppression deadLetterSuppression: + PublishSupressedDeadLetter(deadLetterSuppression, sender); + return true; + + default: + return false; } - - return false; } private void PublishSupressedDeadLetter(IDeadLetterSuppression msg, IActorRef sender) diff --git a/src/core/Akka/IO/Buffers/DirectBufferPool.cs b/src/core/Akka/IO/Buffers/DirectBufferPool.cs index 1e5315913bc..4d112565b81 100644 --- a/src/core/Akka/IO/Buffers/DirectBufferPool.cs +++ b/src/core/Akka/IO/Buffers/DirectBufferPool.cs @@ -33,6 +33,22 @@ protected BufferPoolAllocationException(SerializationInfo info, StreamingContext } } + public class BufferPoolInfo + { + public BufferPoolInfo(Type type, long totalSize, long free, long used) + { + Type = type; + TotalSize = totalSize; + Free = free; + Used = used; + } + + public Type Type { get; } + public long TotalSize { get; } + public long Free { get; } + public long Used { get; } + } + /// /// An interface used to acquire/release recyclable chunks of /// bytes to be reused without need to triggering GC. @@ -70,6 +86,8 @@ public interface IBufferPool /// /// void Release(IEnumerable buf); + + BufferPoolInfo Diagnostics(); } /// @@ -131,6 +149,17 @@ public DirectBufferPool(int bufferSize, int buffersPerSegment, int initialSegmen } } + public BufferPoolInfo Diagnostics() + => new BufferPoolInfo( + type: typeof(DirectBufferPool), + totalSize: _segments.Count * _segmentSize, + free: _buffers.Count * _bufferSize, + used: (_segments.Count * _segmentSize) - (_buffers.Count * _bufferSize)); + + public override string ToString() + => $"_bufferSize: [{_buffers}], _bufferPerSegment: [{_buffersPerSegment}], _segmentSize: [{_segmentSize}], " + + $"_segments.Count: [{_segments.Count}], _buffers.Count: [{_buffers.Count}]"; + private void AllocateSegment() { lock (_syncRoot) @@ -183,7 +212,7 @@ public IEnumerable Rent(int minimumSize) AllocateSegment(); } - throw new BufferPoolAllocationException($"Couldn't allocate enough byte buffer to fill the tolal requested size of {minimumSize} bytes"); + throw new BufferPoolAllocationException($"Couldn't allocate enough byte buffer to fill the total requested size of {minimumSize} bytes"); } catch { @@ -195,8 +224,10 @@ public IEnumerable Rent(int minimumSize) public void Release(ByteBuffer buf) { // only release buffers that have actually been taken from one of the segments - if (buf.Count == _bufferSize && _segments.Contains(buf.Array)) - _buffers.Push(buf); + if (buf.Count != _bufferSize || !_segments.Contains(buf.Array)) + throw new Exception("Wrong ArraySegment was released to DirectBufferPool"); + + _buffers.Push(buf); } public void Release(IEnumerable buffers) diff --git a/src/core/Akka/IO/Buffers/DisabledBufferPool.cs b/src/core/Akka/IO/Buffers/DisabledBufferPool.cs index 64b044ca286..db57672419f 100644 --- a/src/core/Akka/IO/Buffers/DisabledBufferPool.cs +++ b/src/core/Akka/IO/Buffers/DisabledBufferPool.cs @@ -50,7 +50,10 @@ public void Release(IEnumerable buffers) Release(buf); } } - + + public BufferPoolInfo Diagnostics() + => new BufferPoolInfo(typeof(DisabledBufferPool), 0, 0, 0); + private ByteBuffer RentOfSize(int size) { var bytes = new byte[size]; diff --git a/src/core/Akka/IO/SocketEventArgsPool.cs b/src/core/Akka/IO/SocketEventArgsPool.cs index f4a74bf33ec..7f498bff5cd 100644 --- a/src/core/Akka/IO/SocketEventArgsPool.cs +++ b/src/core/Akka/IO/SocketEventArgsPool.cs @@ -12,6 +12,7 @@ using System.Linq; using System.Net.Sockets; using System.Runtime.CompilerServices; +using Akka.Annotations; using Akka.IO.Buffers; using Akka.Util; @@ -21,48 +22,81 @@ public interface ISocketEventArgsPool { SocketAsyncEventArgs Acquire(IActorRef actor); void Release(SocketAsyncEventArgs e); + + BufferPoolInfo BufferPoolInfo { get; } } internal class PreallocatedSocketEventAgrsPool : ISocketEventArgsPool { + private readonly IBufferPool _bufferPool; private readonly EventHandler _onComplete; - private readonly ConcurrentStack _pool = new ConcurrentStack(); + private readonly ConcurrentQueue _pool = new ConcurrentQueue(); - public PreallocatedSocketEventAgrsPool(int initSize, EventHandler onComplete) + + public PreallocatedSocketEventAgrsPool(int initSize, IBufferPool bufferPool, EventHandler onComplete) { + _bufferPool = bufferPool; _onComplete = onComplete; for (var i = 0; i < initSize; i++) { - var e = CreateSocketAsyncEventArgs(); - _pool.Push(e); + var e = new SocketAsyncEventArgs { UserToken = null }; + e.Completed += _onComplete; + _pool.Enqueue(e); } } public SocketAsyncEventArgs Acquire(IActorRef actor) { - if (!_pool.TryPop(out var e)) - e = CreateSocketAsyncEventArgs(); + var buffer = _bufferPool.Rent(); + var acquired = false; + SocketAsyncEventArgs e = null; + while (!acquired) + { + try + { + if (!_pool.TryDequeue(out e)) + e = new SocketAsyncEventArgs(); - e.UserToken = actor; + e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); + e.UserToken = actor; + e.Completed += _onComplete; + acquired = true; + } + catch (InvalidOperationException) + { + // it can be that for some reason socket is in use and haven't closed yet. Dispose anyway to avoid leaks. + e?.Dispose(); + } + } return e; } public void Release(SocketAsyncEventArgs e) { - e.UserToken = null; - e.AcceptSocket = null; + if (e.Buffer != null) + { + _bufferPool.Release(new ArraySegment(e.Buffer, e.Offset, e.Count)); + } + if (e.BufferList != null) + { + foreach (var segment in e.BufferList) + { + _bufferPool.Release(segment); + } + } try { e.SetBuffer(null, 0, 0); - if (e.BufferList != null) - { - e.BufferList = null; - } - + e.BufferList = null; + + e.UserToken = null; + e.AcceptSocket = null; + e.RemoteEndPoint = null; + if (_pool.Count < 2048) // arbitrary taken max amount of free SAEA stored { - _pool.Push(e); + _pool.Enqueue(e); } else { @@ -76,12 +110,7 @@ public void Release(SocketAsyncEventArgs e) } } - private SocketAsyncEventArgs CreateSocketAsyncEventArgs() - { - var e = new SocketAsyncEventArgs { UserToken = null }; - e.Completed += _onComplete; - return e; - } + public BufferPoolInfo BufferPoolInfo => _bufferPool.Diagnostics(); } internal static class SocketAsyncEventArgsExtensions diff --git a/src/core/Akka/IO/Udp.cs b/src/core/Akka/IO/Udp.cs index 213994ab542..c23aab4cf99 100644 --- a/src/core/Akka/IO/Udp.cs +++ b/src/core/Akka/IO/Udp.cs @@ -34,45 +34,51 @@ public class Udp : ExtensionIdProvider { #region internal connection messages - internal abstract class SocketCompleted : INoSerializationVerificationNeeded { } + internal abstract class SocketCompleted : INoSerializationVerificationNeeded + { + public ByteString Data { get; } + + protected SocketCompleted(SocketAsyncEventArgs eventArgs) + { + Data = ByteString.CopyFrom(eventArgs.Buffer, eventArgs.Offset, eventArgs.BytesTransferred); + } + } internal sealed class SocketSent : SocketCompleted { - public readonly SocketAsyncEventArgs EventArgs; - - public SocketSent(SocketAsyncEventArgs eventArgs) + public SocketError SocketError { get; } + public int BytesTransferred { get; } + public SocketSent(SocketAsyncEventArgs eventArgs): base(eventArgs) { - EventArgs = eventArgs; + SocketError = eventArgs.SocketError; + BytesTransferred = eventArgs.BytesTransferred; } } internal sealed class SocketReceived : SocketCompleted { - public readonly SocketAsyncEventArgs EventArgs; + public bool IsIcmpError => SocketError == SocketError.ConnectionReset; + public SocketError SocketError { get; } + public EndPoint RemoteEndPoint { get; } - public SocketReceived(SocketAsyncEventArgs eventArgs) + public SocketReceived(SocketAsyncEventArgs eventArgs): base(eventArgs) { - EventArgs = eventArgs; + SocketError = eventArgs.SocketError; + RemoteEndPoint = eventArgs.RemoteEndPoint; } } internal sealed class SocketAccepted : SocketCompleted { - public readonly SocketAsyncEventArgs EventArgs; - - public SocketAccepted(SocketAsyncEventArgs eventArgs) + public SocketAccepted(SocketAsyncEventArgs eventArgs): base(eventArgs) { - EventArgs = eventArgs; } } internal sealed class SocketConnected : SocketCompleted { - public readonly SocketAsyncEventArgs EventArgs; - - public SocketConnected(SocketAsyncEventArgs eventArgs) + public SocketConnected(SocketAsyncEventArgs eventArgs): base(eventArgs) { - EventArgs = eventArgs; } } @@ -505,12 +511,14 @@ public UdpExt(ExtendedActorSystem system, UdpSettings settings) throw new ConfigurationException($"Cannot retrieve UDP buffer pool configuration: {settings.BufferPoolConfigPath} configuration node not found"); Setting = settings; - BufferPool = CreateBufferPool(system, bufferPoolConfig); Manager = system.SystemActorOf( props: Props.Create(() => new UdpManager(this)).WithDeploy(Deploy.Local), name: "IO-UDP-FF"); - SocketEventArgsPool = new PreallocatedSocketEventAgrsPool(settings.InitialSocketAsyncEventArgs, OnComplete); + SocketEventArgsPool = new PreallocatedSocketEventAgrsPool( + settings.InitialSocketAsyncEventArgs, + CreateBufferPool(system, bufferPoolConfig), + OnComplete); } /// @@ -518,11 +526,6 @@ public UdpExt(ExtendedActorSystem system, UdpSettings settings) /// public override IActorRef Manager { get; } - /// - /// A buffer pool used by current plugin. - /// - public IBufferPool BufferPool { get; } - /// /// TBD /// @@ -530,7 +533,7 @@ public UdpExt(ExtendedActorSystem system, UdpSettings settings) internal PreallocatedSocketEventAgrsPool SocketEventArgsPool { get; } - private IBufferPool CreateBufferPool(ExtendedActorSystem system, Config config) + private static IBufferPool CreateBufferPool(ExtendedActorSystem system, Config config) { if (config.IsNullOrEmpty()) throw ConfigurationException.NullOrEmptyConfig(); @@ -556,16 +559,18 @@ private void OnComplete(object sender, SocketAsyncEventArgs e) { var actorRef = e.UserToken as IActorRef; actorRef?.Tell(ResolveMessage(e)); + SocketEventArgsPool.Release(e); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private Udp.SocketCompleted ResolveMessage(SocketAsyncEventArgs e) + private static Udp.SocketCompleted ResolveMessage(SocketAsyncEventArgs e) { switch (e.LastOperation) { case SocketAsyncOperation.Receive: case SocketAsyncOperation.ReceiveFrom: return new Udp.SocketReceived(e); + /* case SocketAsyncOperation.Send: case SocketAsyncOperation.SendTo: return new Udp.SocketSent(e); @@ -573,6 +578,7 @@ private Udp.SocketCompleted ResolveMessage(SocketAsyncEventArgs e) return new Udp.SocketAccepted(e); case SocketAsyncOperation.Connect: return new Udp.SocketConnected(e); + */ default: throw new NotSupportedException($"Socket operation {e.LastOperation} is not supported"); } diff --git a/src/core/Akka/IO/UdpConnected.cs b/src/core/Akka/IO/UdpConnected.cs index 30b9c213c64..089b678bd1d 100644 --- a/src/core/Akka/IO/UdpConnected.cs +++ b/src/core/Akka/IO/UdpConnected.cs @@ -36,18 +36,20 @@ public class UdpConnected : ExtensionIdProvider internal abstract class SocketCompleted : INoSerializationVerificationNeeded { - public readonly SocketAsyncEventArgs EventArgs; + public ByteString Data { get; } protected SocketCompleted(SocketAsyncEventArgs eventArgs) { - EventArgs = eventArgs; + Data = ByteString.CopyFrom(eventArgs.Buffer, eventArgs.Offset, eventArgs.BytesTransferred); } } internal sealed class SocketSent : SocketCompleted { + public int BytesTransferred { get; } public SocketSent(SocketAsyncEventArgs eventArgs) : base(eventArgs) { + BytesTransferred = eventArgs.BytesTransferred; } } @@ -387,8 +389,10 @@ public UdpConnectedExt(ExtendedActorSystem system, UdpSettings settings) throw new ConfigurationException($"Cannot retrieve UDP buffer pool configuration: {settings.BufferPoolConfigPath} configuration node not found"); Settings = settings; - BufferPool = CreateBufferPool(system, bufferPoolConfig); - SocketEventArgsPool = new PreallocatedSocketEventAgrsPool(Settings.InitialSocketAsyncEventArgs, OnComplete); + SocketEventArgsPool = new PreallocatedSocketEventAgrsPool( + Settings.InitialSocketAsyncEventArgs, + CreateBufferPool(system, bufferPoolConfig), + OnComplete); Manager = system.SystemActorOf( props: Props.Create(() => new UdpConnectedManager(this)).WithDeploy(Deploy.Local), name: "IO-UDP-CONN"); @@ -399,15 +403,11 @@ public UdpConnectedExt(ExtendedActorSystem system, UdpSettings settings) /// public override IActorRef Manager { get; } - /// - /// A buffer pool used by current plugin. - /// - public IBufferPool BufferPool { get; } - internal ISocketEventArgsPool SocketEventArgsPool { get; } internal UdpSettings Settings { get; } - private IBufferPool CreateBufferPool(ExtendedActorSystem system, Config config) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static IBufferPool CreateBufferPool(ExtendedActorSystem system, Config config) { if (config.IsNullOrEmpty()) throw ConfigurationException.NullOrEmptyConfig(); @@ -433,16 +433,18 @@ private void OnComplete(object sender, SocketAsyncEventArgs e) { var actorRef = e.UserToken as IActorRef; actorRef?.Tell(ResolveMessage(e)); + SocketEventArgsPool.Release(e); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private UdpConnected.SocketCompleted ResolveMessage(SocketAsyncEventArgs e) + private static UdpConnected.SocketCompleted ResolveMessage(SocketAsyncEventArgs e) { switch (e.LastOperation) { case SocketAsyncOperation.Receive: case SocketAsyncOperation.ReceiveFrom: return new UdpConnected.SocketReceived(e); + /* case SocketAsyncOperation.Send: case SocketAsyncOperation.SendTo: return new UdpConnected.SocketSent(e); @@ -450,6 +452,7 @@ private UdpConnected.SocketCompleted ResolveMessage(SocketAsyncEventArgs e) return new UdpConnected.SocketAccepted(e); case SocketAsyncOperation.Connect: return new UdpConnected.SocketConnected(e); + */ default: throw new NotSupportedException($"Socket operation {e.LastOperation} is not supported"); } diff --git a/src/core/Akka/IO/UdpConnectedManager.cs b/src/core/Akka/IO/UdpConnectedManager.cs index fb9ecd1997d..71e8602ddd9 100644 --- a/src/core/Akka/IO/UdpConnectedManager.cs +++ b/src/core/Akka/IO/UdpConnectedManager.cs @@ -8,7 +8,6 @@ using System; using Akka.Actor; using Akka.Annotations; -using Akka.Event; namespace Akka.IO { @@ -29,7 +28,6 @@ class UdpConnectedManager : ActorBase public UdpConnectedManager(UdpConnectedExt udpConn) { _udpConn = udpConn; - Context.System.EventStream.Subscribe(Self, typeof(DeadLetter)); } protected override bool Receive(object message) @@ -42,22 +40,7 @@ protected override bool Receive(object message) Context.ActorOf(Props.Create(() => new UdpConnection(_udpConn, commander, connect))); return true; } - case DeadLetter dl when dl.Message is UdpConnected.SocketCompleted completed: - { - var e = completed.EventArgs; - if (e.Buffer != null) - { - // no need to check for e.BufferList: release buffer only - // on complete reads, which are always mono-buffered - var buffer = new ByteBuffer(e.Buffer, e.Offset, e.Count); - _udpConn.BufferPool.Release(buffer); - } - _udpConn.SocketEventArgsPool.Release(e); - return true; - } - case DeadLetter _: return true; default: throw new ArgumentException($"The supplied message type [{message.GetType()}] is invalid. Only Connect messages are supported.", nameof(message)); - } } diff --git a/src/core/Akka/IO/UdpConnection.cs b/src/core/Akka/IO/UdpConnection.cs index 4d7300d6e43..cb48d354be1 100644 --- a/src/core/Akka/IO/UdpConnection.cs +++ b/src/core/Akka/IO/UdpConnection.cs @@ -56,7 +56,7 @@ public UdpConnection(UdpConnectedExt udp, IActorRef commander, Connect connect) } } - private Tuple _pendingSend = null; + private (Send send, IActorRef sender)? _pendingSend = null; private bool WritePending => _pendingSend != null; private Receive Resolving(DnsEndPoint remoteAddress) => message => @@ -116,7 +116,12 @@ private bool Connected(object message) } return true; } - case SocketReceived socketReceived: DoRead(socketReceived, _connect.Handler); return true; + + case SocketReceived socketReceived: + _pendingReceive = null; + DoRead(socketReceived, _connect.Handler); + return true; + case Disconnect _: { Log.Debug("Closing UDP connection to [{0}]", _connect.RemoteAddress); @@ -139,7 +144,7 @@ private bool Connected(object message) { if (!send.Payload.IsEmpty) { - _pendingSend = Tuple.Create(send, Sender); + _pendingSend = (send, Sender); DoWrite(); } else @@ -151,26 +156,25 @@ private bool Connected(object message) return true; } case SocketSent sent: - { - if (_pendingSend.Item1.WantsAck) - _pendingSend.Item2.Tell(_pendingSend.Item1.Ack); - if (Udp.Settings.TraceLogging) - Log.Debug("Wrote [{0}] bytes to socket", sent.EventArgs.BytesTransferred); - _pendingSend = null; - Udp.SocketEventArgsPool.Release(sent.EventArgs); - return true; - } + { + if (_pendingSend == null) + throw new Exception("There are no pending sent"); + + var (send, sender) = _pendingSend.Value; + if (send.WantsAck) + sender.Tell(send.Ack); + if (Udp.Settings.TraceLogging) + Log.Debug("Wrote [{0}] bytes to socket", sent.BytesTransferred); + _pendingSend = null; + return true; + } default: return false; } } private void DoRead(SocketReceived received, IActorRef handler) { - var e = received.EventArgs; - var buffer = new ByteBuffer(e.Buffer, e.Offset, e.BytesTransferred); - var data = new Received(ByteString.CopyFrom(buffer)); - Udp.BufferPool.Release(buffer); - Udp.SocketEventArgsPool.Release(e); + var data = new Received(received.Data); if (!_readingSuspended) { @@ -184,8 +188,7 @@ private void DoWrite() { try { - var send = _pendingSend.Item1; - var sender = _pendingSend.Item2; + var (send, sender) = _pendingSend.Value; var data = send.Payload; var bytesWritten = _socket.Send(data.Buffers); @@ -233,19 +236,20 @@ private void ReportConnectFailure(Action thunk) } } + private SocketAsyncEventArgs _pendingReceive; + private void ReceiveAsync() { + if (_pendingReceive != null) + return; + var e = Udp.SocketEventArgsPool.Acquire(Self); - var buffer = Udp.BufferPool.Rent(); - e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); + _pendingReceive = e; if (!_socket.ReceiveAsync(e)) + { Self.Tell(new SocketReceived(e)); - } - - private void SendAsync(SocketAsyncEventArgs e) - { - if (!_socket.SendToAsync(e)) - Self.Tell(new SocketSent(e)); + Udp.SocketEventArgsPool.Release(e); + } } } } diff --git a/src/core/Akka/IO/UdpListener.cs b/src/core/Akka/IO/UdpListener.cs index 2935c6832bc..e74cff9ce19 100644 --- a/src/core/Akka/IO/UdpListener.cs +++ b/src/core/Akka/IO/UdpListener.cs @@ -23,7 +23,7 @@ namespace Akka.IO /// INTERNAL API /// [InternalApi] - class UdpListener : WithUdpSend, IRequiresMessageQueue + internal class UdpListener : WithUdpSend, IRequiresMessageQueue { private readonly IActorRef _bindCommander; private readonly Bind _bind; @@ -94,7 +94,7 @@ private bool ReadHandlers(object message) ReceiveAsync(); return true; case SocketReceived received: - DoReceive(received.EventArgs, _bind.Handler); + DoReceive(received, _bind.Handler); return true; case Unbind _: Log.Debug("Unbinding endpoint [{0}]", _bind.LocalAddress); @@ -114,42 +114,20 @@ private bool ReadHandlers(object message) return false; } - private void DoReceive(SocketAsyncEventArgs e, IActorRef handler) + private void DoReceive(SocketReceived e, IActorRef handler) { - try + if(e.IsIcmpError) { - if (!IsICMPError(e)) - { - if (e.SocketError != SocketError.Success) - throw new SocketException((int)e.SocketError); - - handler.Tell(new Received(ByteString.CopyFrom(e.Buffer, e.Offset, e.BytesTransferred), - e.RemoteEndPoint)); - } - + Log.Debug("Ignoring client connection reset."); ReceiveAsync(); + return; } - finally - { - var buffer = new ByteBuffer(e.Buffer, e.Offset, e.Count); - Udp.SocketEventArgsPool.Release(e); - Udp.BufferPool.Release(buffer); - } - } - /// - /// Checks if the socket event is an ICMP error message. - /// - /// - private bool IsICMPError(SocketAsyncEventArgs e) - { - if (e.SocketError == SocketError.ConnectionReset) - { - Log.Debug("Ignoring client connection reset."); - return true; - } + if (e.SocketError != SocketError.Success) + throw new SocketException((int)e.SocketError); - return false; + handler.Tell(new Received(e.Data, e.RemoteEndPoint)); + ReceiveAsync(); } /// @@ -174,12 +152,12 @@ protected override void PostStop() private void ReceiveAsync() { var e = Udp.SocketEventArgsPool.Acquire(Self); - - var buffer = Udp.BufferPool.Rent(); - e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); e.RemoteEndPoint = Socket.LocalEndPoint; if (!Socket.ReceiveFromAsync(e)) + { Self.Tell(new SocketReceived(e)); + Udp.SocketEventArgsPool.Release(e); + } } } } diff --git a/src/core/Akka/IO/WithUdpSend.cs b/src/core/Akka/IO/WithUdpSend.cs index 03d8d1656e1..8306889eb94 100644 --- a/src/core/Akka/IO/WithUdpSend.cs +++ b/src/core/Akka/IO/WithUdpSend.cs @@ -46,7 +46,6 @@ public bool SendHandlers(object message) _pendingSend = send; _pendingCommander = Sender; - //var e = Udp.SocketEventArgsPool.Acquire(Self); if (send.Target is DnsEndPoint dns) { var resolved = Dns.ResolveName(dns.Host, Context.System, Self); @@ -83,10 +82,10 @@ public bool SendHandlers(object message) } case SocketSent sent: { - if (sent.EventArgs.SocketError == SocketError.Success) + if (sent.SocketError == SocketError.Success) { if (Udp.Setting.TraceLogging) - _log.Debug("Wrote [{0}] bytes to channel", sent.EventArgs.BytesTransferred); + _log.Debug("Wrote [{0}] bytes to channel", sent.BytesTransferred); if (_pendingSend.WantsAck) _pendingCommander.Tell(_pendingSend.Ack); diff --git a/src/core/Akka/Util/ByteString.cs b/src/core/Akka/Util/ByteString.cs index 45bf403dec5..7304690759c 100644 --- a/src/core/Akka/Util/ByteString.cs +++ b/src/core/Akka/Util/ByteString.cs @@ -40,6 +40,11 @@ internal static ByteString FromBuffers(IEnumerator buffers) return FromBytes(cached); } + internal static ByteString FromArraySegments(IList> segments) + => FromBytes(segments + .Where(s => s.Array != null) + .Select(s => new ByteBuffer(s.Array, s.Offset, s.Count))); + /// /// Creates a new by copying a provided byte array. /// From c459e5178cac957d4ba55d5e38d6e055adcb0cd4 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 2 Dec 2021 00:44:59 +0700 Subject: [PATCH 2/9] Revert changes that are not connected to UDP --- src/core/Akka/Actor/BuiltInActors.cs | 10 +-- src/core/Akka/Actor/EmptyLocalActorRef.cs | 75 +++++++++++------------ src/core/Akka/Util/ByteString.cs | 5 -- 3 files changed, 43 insertions(+), 47 deletions(-) diff --git a/src/core/Akka/Actor/BuiltInActors.cs b/src/core/Akka/Actor/BuiltInActors.cs index e73d266ecdf..a2aa954b9c4 100644 --- a/src/core/Akka/Actor/BuiltInActors.cs +++ b/src/core/Akka/Actor/BuiltInActors.cs @@ -225,13 +225,14 @@ public DeadLetterActorRef(IActorRefProvider provider, ActorPath path, EventStrea protected override void TellInternal(object message, IActorRef sender) { if (message == null) throw new InvalidMessageException("Message is null"); - if (message is Identify i) + var i = message as Identify; + if (i != null) { sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody)); return; } - - if (message is DeadLetter d) + var d = message as DeadLetter; + if (d != null) { if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); } return; @@ -247,7 +248,8 @@ protected override void TellInternal(object message, IActorRef sender) /// TBD protected override bool SpecialHandle(object message, IActorRef sender) { - if (message is Watch w) + var w = message as Watch; + if (w != null) { if (!w.Watchee.Equals(this) && !w.Watcher.Equals(this)) { diff --git a/src/core/Akka/Actor/EmptyLocalActorRef.cs b/src/core/Akka/Actor/EmptyLocalActorRef.cs index 1d9ae7b7c36..2c37220d98a 100644 --- a/src/core/Akka/Actor/EmptyLocalActorRef.cs +++ b/src/core/Akka/Actor/EmptyLocalActorRef.cs @@ -84,52 +84,51 @@ public override void SendSystemMessage(ISystemMessage message) /// TBD protected virtual bool SpecialHandle(object message, IActorRef sender) { - switch (message) + if (message is Watch watch) { - case Watch watch: + if (watch.Watchee.Equals(this) && !watch.Watcher.Equals(this)) { - if (watch.Watchee.Equals(this) && !watch.Watcher.Equals(this)) - { - watch.Watcher.SendSystemMessage(new DeathWatchNotification(watch.Watchee, existenceConfirmed: false, addressTerminated: false)); - } - return true; + watch.Watcher.SendSystemMessage(new DeathWatchNotification(watch.Watchee, existenceConfirmed: false, addressTerminated: false)); } - - case Unwatch _: - return true; //Just ignore - - case Identify identify: - sender.Tell(new ActorIdentity(identify.MessageId, null)); - return true; - - case ActorSelectionMessage actorSelectionMessage: + return true; + } + if (message is Unwatch) + return true; //Just ignore + + if (message is Identify identify) + { + sender.Tell(new ActorIdentity(identify.MessageId, null)); + return true; + } + + if (message is ActorSelectionMessage actorSelectionMessage) + { + if (actorSelectionMessage.Message is Identify selectionIdentify) { - switch (actorSelectionMessage.Message) + if (!actorSelectionMessage.WildCardFanOut) + sender.Tell(new ActorIdentity(selectionIdentify.MessageId, null)); + } + else + { + if (actorSelectionMessage.Message is IDeadLetterSuppression selectionDeadLetterSuppression) { - case Identify selectionIdentify: - { - if (!actorSelectionMessage.WildCardFanOut) - sender.Tell(new ActorIdentity(selectionIdentify.MessageId, null)); - break; - } - case IDeadLetterSuppression selectionDeadLetterSuppression: - PublishSupressedDeadLetter(selectionDeadLetterSuppression, sender); - break; - default: - _eventStream.Publish(new DeadLetter(actorSelectionMessage.Message, sender.IsNobody() ? _provider.DeadLetters : sender, this)); - break; + PublishSupressedDeadLetter(selectionDeadLetterSuppression, sender); + } + else + { + _eventStream.Publish(new DeadLetter(actorSelectionMessage.Message, sender.IsNobody() ? _provider.DeadLetters : sender, this)); } - - return true; } - - case IDeadLetterSuppression deadLetterSuppression: - PublishSupressedDeadLetter(deadLetterSuppression, sender); - return true; - - default: - return false; + return true; } + + if (message is IDeadLetterSuppression deadLetterSuppression) + { + PublishSupressedDeadLetter(deadLetterSuppression, sender); + return true; + } + + return false; } private void PublishSupressedDeadLetter(IDeadLetterSuppression msg, IActorRef sender) diff --git a/src/core/Akka/Util/ByteString.cs b/src/core/Akka/Util/ByteString.cs index 7304690759c..45bf403dec5 100644 --- a/src/core/Akka/Util/ByteString.cs +++ b/src/core/Akka/Util/ByteString.cs @@ -40,11 +40,6 @@ internal static ByteString FromBuffers(IEnumerator buffers) return FromBytes(cached); } - internal static ByteString FromArraySegments(IList> segments) - => FromBytes(segments - .Where(s => s.Array != null) - .Select(s => new ByteBuffer(s.Array, s.Offset, s.Count))); - /// /// Creates a new by copying a provided byte array. /// From 9c72778113b7df4ead28062e46b74071c0cf3f94 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 2 Dec 2021 01:04:59 +0700 Subject: [PATCH 3/9] Code comment cleanup --- src/core/Akka/IO/Udp.cs | 9 --------- src/core/Akka/IO/UdpConnected.cs | 9 --------- 2 files changed, 18 deletions(-) diff --git a/src/core/Akka/IO/Udp.cs b/src/core/Akka/IO/Udp.cs index c23aab4cf99..7fad220ca20 100644 --- a/src/core/Akka/IO/Udp.cs +++ b/src/core/Akka/IO/Udp.cs @@ -570,15 +570,6 @@ private static Udp.SocketCompleted ResolveMessage(SocketAsyncEventArgs e) case SocketAsyncOperation.Receive: case SocketAsyncOperation.ReceiveFrom: return new Udp.SocketReceived(e); - /* - case SocketAsyncOperation.Send: - case SocketAsyncOperation.SendTo: - return new Udp.SocketSent(e); - case SocketAsyncOperation.Accept: - return new Udp.SocketAccepted(e); - case SocketAsyncOperation.Connect: - return new Udp.SocketConnected(e); - */ default: throw new NotSupportedException($"Socket operation {e.LastOperation} is not supported"); } diff --git a/src/core/Akka/IO/UdpConnected.cs b/src/core/Akka/IO/UdpConnected.cs index 089b678bd1d..64193e12b13 100644 --- a/src/core/Akka/IO/UdpConnected.cs +++ b/src/core/Akka/IO/UdpConnected.cs @@ -444,15 +444,6 @@ private static UdpConnected.SocketCompleted ResolveMessage(SocketAsyncEventArgs case SocketAsyncOperation.Receive: case SocketAsyncOperation.ReceiveFrom: return new UdpConnected.SocketReceived(e); - /* - case SocketAsyncOperation.Send: - case SocketAsyncOperation.SendTo: - return new UdpConnected.SocketSent(e); - case SocketAsyncOperation.Accept: - return new UdpConnected.SocketAccepted(e); - case SocketAsyncOperation.Connect: - return new UdpConnected.SocketConnected(e); - */ default: throw new NotSupportedException($"Socket operation {e.LastOperation} is not supported"); } From 74862f75e7bb54613c2e118709e3c495bc2524a8 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 2 Dec 2021 02:45:16 +0700 Subject: [PATCH 4/9] Update API Approval list --- .../CoreAPISpec.ApproveCore.approved.txt | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 254164cb973..987fe42da2e 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -3232,8 +3232,17 @@ namespace Akka.IO.Buffers public BufferPoolAllocationException(string message) { } protected BufferPoolAllocationException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { } } + public class BufferPoolInfo + { + public BufferPoolInfo(System.Type type, long totalSize, long free, long used) { } + public long Free { get; } + public long TotalSize { get; } + public System.Type Type { get; } + public long Used { get; } + } public interface IBufferPool { + Akka.IO.Buffers.BufferPoolInfo Diagnostics(); void Release(System.ArraySegment buf); void Release(System.Collections.Generic.IEnumerable> buf); System.ArraySegment Rent(); @@ -3354,6 +3363,7 @@ namespace Akka.IO } public interface ISocketEventArgsPool { + Akka.IO.Buffers.BufferPoolInfo BufferPoolInfo { get; } System.Net.Sockets.SocketAsyncEventArgs Acquire(Akka.Actor.IActorRef actor); void Release(System.Net.Sockets.SocketAsyncEventArgs e); } @@ -3876,14 +3886,12 @@ namespace Akka.IO { public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system) { } public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { } - public Akka.IO.Buffers.IBufferPool BufferPool { get; } public override Akka.Actor.IActorRef Manager { get; } } public class UdpExt : Akka.IO.IOExtension { public UdpExt(Akka.Actor.ExtendedActorSystem system) { } public UdpExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { } - public Akka.IO.Buffers.IBufferPool BufferPool { get; } public override Akka.Actor.IActorRef Manager { get; } } public class static UdpExtensions From 8768a782bce4411c28b719de7b22625d7025323e Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 2 Dec 2021 03:45:02 +0700 Subject: [PATCH 5/9] Remove SocketEventArgsPool functionality --- src/core/Akka/IO/SocketEventArgsPool.cs | 55 +++---------------------- 1 file changed, 5 insertions(+), 50 deletions(-) diff --git a/src/core/Akka/IO/SocketEventArgsPool.cs b/src/core/Akka/IO/SocketEventArgsPool.cs index 7f498bff5cd..f0aae454ffa 100644 --- a/src/core/Akka/IO/SocketEventArgsPool.cs +++ b/src/core/Akka/IO/SocketEventArgsPool.cs @@ -30,44 +30,22 @@ internal class PreallocatedSocketEventAgrsPool : ISocketEventArgsPool { private readonly IBufferPool _bufferPool; private readonly EventHandler _onComplete; - private readonly ConcurrentQueue _pool = new ConcurrentQueue(); public PreallocatedSocketEventAgrsPool(int initSize, IBufferPool bufferPool, EventHandler onComplete) { _bufferPool = bufferPool; _onComplete = onComplete; - for (var i = 0; i < initSize; i++) - { - var e = new SocketAsyncEventArgs { UserToken = null }; - e.Completed += _onComplete; - _pool.Enqueue(e); - } } public SocketAsyncEventArgs Acquire(IActorRef actor) { var buffer = _bufferPool.Rent(); - var acquired = false; - SocketAsyncEventArgs e = null; - while (!acquired) - { - try - { - if (!_pool.TryDequeue(out e)) - e = new SocketAsyncEventArgs(); + var e = new SocketAsyncEventArgs(); - e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); - e.UserToken = actor; - e.Completed += _onComplete; - acquired = true; - } - catch (InvalidOperationException) - { - // it can be that for some reason socket is in use and haven't closed yet. Dispose anyway to avoid leaks. - e?.Dispose(); - } - } + e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); + e.UserToken = actor; + e.Completed += _onComplete; return e; } @@ -84,30 +62,7 @@ public void Release(SocketAsyncEventArgs e) _bufferPool.Release(segment); } } - - try - { - e.SetBuffer(null, 0, 0); - e.BufferList = null; - - e.UserToken = null; - e.AcceptSocket = null; - e.RemoteEndPoint = null; - - if (_pool.Count < 2048) // arbitrary taken max amount of free SAEA stored - { - _pool.Enqueue(e); - } - else - { - e.Dispose(); - } - } - catch (InvalidOperationException) - { - // it can be that for some reason socket is in use and haven't closed yet. Dispose anyway to avoid leaks. - e.Dispose(); - } + e.Dispose(); } public BufferPoolInfo BufferPoolInfo => _bufferPool.Diagnostics(); From 5a4251ea72d672622db45e241a371b04a6bf9345 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 2 Dec 2021 03:45:21 +0700 Subject: [PATCH 6/9] Fix specs --- .../IO/UdpConnectedIntegrationSpec.cs | 31 ++----- src/core/Akka.Tests/IO/UdpIntegrationSpec.cs | 82 +++++++++++++++---- 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs index 54ad93e90f0..20341f393a8 100644 --- a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs @@ -155,7 +155,8 @@ public void The_UDP_connection_oriented_implementation_must_to_send_batch_writes [Fact] public void The_UDP_connection_oriented_implementation_must_not_leak_memory() { - const int batchSize = 5000; + const int batchCount = 2000; + const int batchSize = 100; var serverAddress = _addresses[0]; var clientAddress = _addresses[1]; @@ -179,14 +180,15 @@ public void The_UDP_connection_oriented_implementation_must_not_leak_memory() var data = ByteString.FromString("Fly little packet!"); // send a lot of packets through, the byte buffer pool should not leak anything - for (var j = 0; j < batchSize; ++j) + for (var n = 0; n < batchCount; ++n) { - serverEp.Tell(UdpConnected.Send.Create(data)); - } + for (var j = 0; j < batchSize; ++j) + serverEp.Tell(UdpConnected.Send.Create(data)); - var msgs = client.ReceiveN(batchSize, TimeSpan.FromSeconds(10)); - var cast = msgs.Cast(); - cast.Sum(m => m.Data.Count).Should().Be(data.Count * batchSize); + var msgs = client.ReceiveN(batchSize, TimeSpan.FromSeconds(10)); + var cast = msgs.Cast(); + cast.Sum(m => m.Data.Count).Should().Be(data.Count * batchSize); + } // stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected serverEp.Tell(UdpConnected.Disconnect.Instance, server); @@ -202,20 +204,5 @@ public void The_UDP_connection_oriented_implementation_must_not_leak_memory() poolInfo.Free.Should().Be(poolInfo.TotalSize); poolInfo.Used.Should().Be(0); } - - internal class UdpClientHandler: ReceiveActor - { - public int ReceivedMessages; - public int TotalData; - public UdpClientHandler() - { - Receive(rcv => - { - ReceivedMessages++; - TotalData += rcv.Data.Count; - }); - ReceiveAny(obj => throw new Exception("Invalid message")); - } - } } } diff --git a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs index 795cb42c792..3f6c7ec7c24 100644 --- a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs @@ -121,27 +121,28 @@ public void The_UDP_Fire_and_Forget_implementation_must_be_able_to_send_several_ { var serverAddress = _addresses[0]; var clientAddress = _addresses[1]; - var server = BindUdp(serverAddress, TestActor); - var client = BindUdp(clientAddress, TestActor); - var data = ByteString.FromString("Fly little packet!"); + var serverProbe = CreateTestProbe(); + var clientProbe = CreateTestProbe(); + var server = BindUdp(serverAddress, serverProbe); + var client = BindUdp(clientAddress, clientProbe); void CheckSendingToClient(int iteration) { - server.Tell(Udp.Send.Create(data, clientAddress)); - ExpectMsg(x => + server.Tell(Udp.Send.Create(ByteString.FromString(iteration.ToString()), clientAddress)); + clientProbe.ExpectMsg(x => { - x.Data.ShouldBe(data); - x.Sender.Is(serverAddress).ShouldBeTrue($"{x.Sender} was expected to be {serverAddress}"); + x.Data.ToString().ShouldBe(iteration.ToString()); + x.Sender.Is(serverAddress).ShouldBeTrue($"Client sender {x.Sender} was expected to be {serverAddress}"); }, hint: $"sending to client failed in {iteration} iteration"); } void CheckSendingToServer(int iteration) { - client.Tell(Udp.Send.Create(data, serverAddress)); - ExpectMsg(x => + client.Tell(Udp.Send.Create(ByteString.FromString(iteration.ToString()), serverAddress)); + serverProbe.ExpectMsg(x => { - x.Data.ShouldBe(data); - Assert.True(x.Sender.Is(clientAddress)); + x.Data.ToString().ShouldBe(iteration.ToString()); + x.Sender.Is(clientAddress).ShouldBeTrue($"Server sender {x.Sender} was expected to be {clientAddress}"); }, hint: $"sending to client failed in {iteration} iteration"); } @@ -206,7 +207,8 @@ void CheckSendingToServer(ByteString expected) [Fact] public void The_UDP_Fire_and_Forget_implementation_must_not_leak_memory() { - const int batchSize = 5000; + const int batchCount = 2000; + const int batchSize = 100; var serverAddress = _addresses[0]; var clientAddress = _addresses[1]; @@ -225,12 +227,15 @@ public void The_UDP_Fire_and_Forget_implementation_must_not_leak_memory() var data = ByteString.FromString("Fly little packet!"); // send a lot of packets through, the byte buffer pool should not leak anything - for (int i = 0; i < batchSize; i++) - server.Tell(Udp.Send.Create(data, clientAddress)); + for (var n = 0; n < batchCount; ++n) + { + for (var i = 0; i < batchSize; i++) + server.Tell(Udp.Send.Create(data, clientAddress)); - var msgs = clientProbe.ReceiveN(batchSize); - var receives = msgs.Cast(); - receives.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize); + var msgs = clientProbe.ReceiveN(batchSize); + var receives = msgs.Cast(); + receives.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize); + } // stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected server.Tell(Udp.Unbind.Instance, serverProbe); @@ -247,6 +252,49 @@ public void The_UDP_Fire_and_Forget_implementation_must_not_leak_memory() poolInfo.Used.Should().Be(0); } + [Fact] + public void The_UDP_Fire_and_Forget_SimpleSender_implementation_must_not_leak_memory() + { + const int batchCount = 2000; + const int batchSize = 100; + + var udp = Udp.Instance.Apply(Sys); + var poolInfo = udp.SocketEventArgsPool.BufferPoolInfo; + poolInfo.Type.Should().Be(typeof(DirectBufferPool)); + poolInfo.Free.Should().Be(poolInfo.TotalSize); + poolInfo.Used.Should().Be(0); + + var serverAddress = _addresses[0]; + var serverProbe = CreateTestProbe(); + var server = BindUdp(serverAddress, serverProbe); + var sender = SimpleSender(); + + var data = ByteString.FromString("Fly little packet!"); + + // send a lot of packets through, the byte buffer pool should not leak anything + for (var n = 0; n < batchCount; ++n) + { + for (int i = 0; i < batchSize; i++) + sender.Tell(Udp.Send.Create(data, serverAddress)); + + var msgs = serverProbe.ReceiveN(batchSize); + var receives = msgs.Cast(); + receives.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize); + } + + // stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected + server.Tell(Udp.Unbind.Instance, serverProbe); + serverProbe.ExpectMsg(); + + // wait for all SocketAsyncEventArgs to be released + Thread.Sleep(1000); + + poolInfo = udp.SocketEventArgsPool.BufferPoolInfo; + poolInfo.Type.Should().Be(typeof(DirectBufferPool)); + poolInfo.Free.Should().Be(poolInfo.TotalSize); + poolInfo.Used.Should().Be(0); + } + [Fact] public void The_UDP_Fire_and_Forget_implementation_must_call_SocketOption_beforeBind_method_before_bind() { From 541b776b56a1c617f442570cfcb884e5688d595c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 2 Dec 2021 06:41:14 +0700 Subject: [PATCH 7/9] Add documentation in comments to describe the architectural changes, use Assert.Debug instead of throwing. --- src/core/Akka/IO/Buffers/DirectBufferPool.cs | 17 ++++++++++++++--- src/core/Akka/IO/SocketEventArgsPool.cs | 14 +++++++++++++- src/core/Akka/IO/UdpConnected.cs | 11 +++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/src/core/Akka/IO/Buffers/DirectBufferPool.cs b/src/core/Akka/IO/Buffers/DirectBufferPool.cs index 4d112565b81..12e3baf60c3 100644 --- a/src/core/Akka/IO/Buffers/DirectBufferPool.cs +++ b/src/core/Akka/IO/Buffers/DirectBufferPool.cs @@ -8,9 +8,12 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Runtime.Serialization; using Akka.Actor; using Akka.Configuration; +using Akka.Event; +using Debug = System.Diagnostics.Debug; namespace Akka.IO.Buffers { @@ -109,6 +112,7 @@ internal sealed class DirectBufferPool : IBufferPool private const int Retries = 30; private readonly object _syncRoot = new object(); + private readonly ILoggingAdapter _log; private readonly int _bufferSize; private readonly int _buffersPerSegment; private readonly int _segmentSize; @@ -126,17 +130,20 @@ public DirectBufferPool(ExtendedActorSystem system, Config config) : this( bufferSize: config.GetInt("buffer-size", 256), buffersPerSegment: config.GetInt("buffers-per-segment", 250), initialSegments: config.GetInt("initial-segments", 1), - maxSegments: config.GetInt("buffer-pool-limit", 1000)) + maxSegments: config.GetInt("buffer-pool-limit", 1000), + system) { } - public DirectBufferPool(int bufferSize, int buffersPerSegment, int initialSegments, int maxSegments) + public DirectBufferPool(int bufferSize, int buffersPerSegment, int initialSegments, int maxSegments, ActorSystem system) { if (bufferSize <= 0) throw new ArgumentException("Buffer size must be positive number", nameof(bufferSize)); if (buffersPerSegment <= 0) throw new ArgumentException("Number of buffers per segment must be positive", nameof(buffersPerSegment)); if (initialSegments <= 0) throw new ArgumentException("Number of initial segments must be positivie", nameof(initialSegments)); if (maxSegments < initialSegments) throw new ArgumentException("Maximum number of segments must not be less than the initial one", nameof(maxSegments)); + _log = Logging.GetLogger(system, GetType()); + _bufferSize = bufferSize; _buffersPerSegment = buffersPerSegment; _segmentSize = bufferSize * buffersPerSegment; @@ -224,8 +231,12 @@ public IEnumerable Rent(int minimumSize) public void Release(ByteBuffer buf) { // only release buffers that have actually been taken from one of the segments + Debug.Assert( + buf.Count != _bufferSize || !_segments.Contains(buf.Array), + "Wrong ArraySegment was returned to the pool. WARNING: This can lead to memory leak."); + if (buf.Count != _bufferSize || !_segments.Contains(buf.Array)) - throw new Exception("Wrong ArraySegment was released to DirectBufferPool"); + return; _buffers.Push(buf); } diff --git a/src/core/Akka/IO/SocketEventArgsPool.cs b/src/core/Akka/IO/SocketEventArgsPool.cs index f0aae454ffa..662955d3fbc 100644 --- a/src/core/Akka/IO/SocketEventArgsPool.cs +++ b/src/core/Akka/IO/SocketEventArgsPool.cs @@ -26,18 +26,30 @@ public interface ISocketEventArgsPool BufferPoolInfo BufferPoolInfo { get; } } + // This class __does not__ pool and reuse SocketAsyncEventArgs anymore. Reusing SocketAsyncEventArgs with + // multiple Socket instances is dangerous because SocketAsyncEventArgs is not a simple struct or POCO, + // it actually held internal states that can wreak havoc if being used in another socket instance. + // It is impossible to clear a SocketAsyncEventArgs object and the hassle of trying to handle every single + // edge case outweigh the speed and memory gain of pooling the instances. internal class PreallocatedSocketEventAgrsPool : ISocketEventArgsPool { + // Byte buffer pool is moved here to reduce the chance that a memory segment got mis-managed + // and not released properly. We only need to worry about acquiring and releasing SocketAsyncEventArgs + // and not worry about having to check to see if we need to rent or release any buffer. + // + // There is no reason why users or developers would need to touch memory management code because it is + // very specific for providing byte buffers for SocketAsyncEventArgs private readonly IBufferPool _bufferPool; + private readonly EventHandler _onComplete; - public PreallocatedSocketEventAgrsPool(int initSize, IBufferPool bufferPool, EventHandler onComplete) { _bufferPool = bufferPool; _onComplete = onComplete; } + public SocketAsyncEventArgs Acquire(IActorRef actor) { var buffer = _bufferPool.Rent(); diff --git a/src/core/Akka/IO/UdpConnected.cs b/src/core/Akka/IO/UdpConnected.cs index 64193e12b13..d47ebff5e88 100644 --- a/src/core/Akka/IO/UdpConnected.cs +++ b/src/core/Akka/IO/UdpConnected.cs @@ -34,6 +34,17 @@ public class UdpConnected : ExtensionIdProvider { #region internal connection messages + // SocketAsyncEventArgs data are copied into these response messages instead of being referenced/embedded + // inside the message. This is done because it is very dangerous to embed SocketAsyncEventArgs in an actor + // message. + // + // SocketAsyncEventArgs might held a reference to a buffer who are managed by DirectBufferPool and + // an actor message might end up being sent to the DeadLetters mailbox, resulting in memory leak since the + // buffer would never get returned properly to the buffer pool. + // + // SocketAsyncEventArgs should never leave the ReceiveAsync() method and the OnComplete callback. It should + // be returned immediately to PreallocatedSocketEventAgrsPool so that the buffer can be safely pooled back. + internal abstract class SocketCompleted : INoSerializationVerificationNeeded { public ByteString Data { get; } From 926b220940857648a794a2161a52ece73fa13c13 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 2 Dec 2021 21:39:59 +0700 Subject: [PATCH 8/9] Remove the logger, we're using Debug.Assert instead --- src/core/Akka/IO/Buffers/DirectBufferPool.cs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/core/Akka/IO/Buffers/DirectBufferPool.cs b/src/core/Akka/IO/Buffers/DirectBufferPool.cs index 12e3baf60c3..fe090ce91d0 100644 --- a/src/core/Akka/IO/Buffers/DirectBufferPool.cs +++ b/src/core/Akka/IO/Buffers/DirectBufferPool.cs @@ -112,7 +112,6 @@ internal sealed class DirectBufferPool : IBufferPool private const int Retries = 30; private readonly object _syncRoot = new object(); - private readonly ILoggingAdapter _log; private readonly int _bufferSize; private readonly int _buffersPerSegment; private readonly int _segmentSize; @@ -130,20 +129,17 @@ public DirectBufferPool(ExtendedActorSystem system, Config config) : this( bufferSize: config.GetInt("buffer-size", 256), buffersPerSegment: config.GetInt("buffers-per-segment", 250), initialSegments: config.GetInt("initial-segments", 1), - maxSegments: config.GetInt("buffer-pool-limit", 1000), - system) + maxSegments: config.GetInt("buffer-pool-limit", 1000)) { } - public DirectBufferPool(int bufferSize, int buffersPerSegment, int initialSegments, int maxSegments, ActorSystem system) + public DirectBufferPool(int bufferSize, int buffersPerSegment, int initialSegments, int maxSegments) { if (bufferSize <= 0) throw new ArgumentException("Buffer size must be positive number", nameof(bufferSize)); if (buffersPerSegment <= 0) throw new ArgumentException("Number of buffers per segment must be positive", nameof(buffersPerSegment)); if (initialSegments <= 0) throw new ArgumentException("Number of initial segments must be positivie", nameof(initialSegments)); if (maxSegments < initialSegments) throw new ArgumentException("Maximum number of segments must not be less than the initial one", nameof(maxSegments)); - _log = Logging.GetLogger(system, GetType()); - _bufferSize = bufferSize; _buffersPerSegment = buffersPerSegment; _segmentSize = bufferSize * buffersPerSegment; From 5faf777b8a65e3192ce96efc27668184e28488fb Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 2 Dec 2021 21:41:26 +0700 Subject: [PATCH 9/9] Clean up assert code --- src/core/Akka/IO/Buffers/DirectBufferPool.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/core/Akka/IO/Buffers/DirectBufferPool.cs b/src/core/Akka/IO/Buffers/DirectBufferPool.cs index fe090ce91d0..ee90a3f642d 100644 --- a/src/core/Akka/IO/Buffers/DirectBufferPool.cs +++ b/src/core/Akka/IO/Buffers/DirectBufferPool.cs @@ -227,12 +227,11 @@ public IEnumerable Rent(int minimumSize) public void Release(ByteBuffer buf) { // only release buffers that have actually been taken from one of the segments - Debug.Assert( - buf.Count != _bufferSize || !_segments.Contains(buf.Array), - "Wrong ArraySegment was returned to the pool. WARNING: This can lead to memory leak."); - if (buf.Count != _bufferSize || !_segments.Contains(buf.Array)) + { + Debug.Assert(false, "Wrong ArraySegment was returned to the pool. WARNING: This can lead to memory leak."); return; + } _buffers.Push(buf); }