-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix UDP memory leak #5404
Fix UDP memory leak #5404
Conversation
if (buf.Count == _bufferSize && _segments.Contains(buf.Array)) | ||
_buffers.Push(buf); | ||
if (buf.Count != _bufferSize || !_segments.Contains(buf.Array)) | ||
throw new Exception("Wrong ArraySegment<byte> was released to DirectBufferPool"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sanity check to make sure that any ArraySegment<byte>
returned to the buffer actually belongs to the buffer, should only happen during development.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would it only happen during development?
} | ||
|
||
internal class PreallocatedSocketEventAgrsPool : ISocketEventArgsPool | ||
{ | ||
private readonly IBufferPool _bufferPool; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffer pool belongs to the socket event pool now, no one should touch this. ever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment saying this and explaining why
private readonly EventHandler<SocketAsyncEventArgs> _onComplete; | ||
private readonly ConcurrentStack<SocketAsyncEventArgs> _pool = new ConcurrentStack<SocketAsyncEventArgs>(); | ||
private readonly ConcurrentQueue<SocketAsyncEventArgs> _pool = new ConcurrentQueue<SocketAsyncEventArgs>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to make sure that the content of the pool are rotated evenly, instead of only using the top ones. Minimizes the problem where SocketAsyncEventArgs could not be reused because it is locked by the previous socket.
To be honest, this whole class is a bad idea because SocketAsyncEventArgs are only supposed to be recycled if they're used by a single socket.
} | ||
} | ||
|
||
public SocketAsyncEventArgs Acquire(IActorRef actor) | ||
{ | ||
if (!_pool.TryPop(out var e)) | ||
e = CreateSocketAsyncEventArgs(); | ||
var buffer = _bufferPool.Rent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Byte buffer pool rent can only happen here and no where else.
} | ||
catch (InvalidOperationException) | ||
{ | ||
// it can be that for some reason socket is in use and haven't closed yet. Dispose anyway to avoid leaks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will always be the case if this pool are being used by multiple sockets...
|
||
e.UserToken = actor; | ||
e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine because we only rent once and if we fail to acquire an instance, we immediately dispose it.
|
||
protected SocketCompleted(SocketAsyncEventArgs eventArgs) | ||
{ | ||
Data = ByteString.CopyFrom(eventArgs.Buffer, eventArgs.Offset, eventArgs.BytesTransferred); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of embedding the SocketAsyncEventArgs, we extract the data and store that instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we make a copy of the data, so there's no problems if the SocketAsyncEventArgs
get processed and the buffer pool is released.
{ | ||
switch (e.LastOperation) | ||
{ | ||
case SocketAsyncOperation.Receive: | ||
case SocketAsyncOperation.ReceiveFrom: | ||
return new Udp.SocketReceived(e); | ||
case SocketAsyncOperation.Send: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are never used. Removed to remove red herrings and possible problem in the future.
default: return false; | ||
} | ||
} | ||
|
||
private void DoRead(SocketReceived received, IActorRef handler) | ||
{ | ||
var e = received.EventArgs; | ||
var buffer = new ByteBuffer(e.Buffer, e.Offset, e.BytesTransferred); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never happen anymore, all handled in the SocketAsyncEventArgs pool
var e = Udp.SocketEventArgsPool.Acquire(Self); | ||
var buffer = Udp.BufferPool.Rent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we don't have to worry about this anymore.
|
_onComplete = onComplete; | ||
for (var i = 0; i < initSize; i++) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed pool functionality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some questions, nitpicks, and minor suggestions but this looks like a great structural improvement for Akka.IO.Udp. Nice work.
poolInfo = udpConnection.SocketEventArgsPool.BufferPoolInfo; | ||
poolInfo.Type.Should().Be(typeof(DirectBufferPool)); | ||
poolInfo.Free.Should().Be(poolInfo.TotalSize); | ||
poolInfo.Used.Should().Be(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
poolInfo = udp.SocketEventArgsPool.BufferPoolInfo; | ||
poolInfo.Type.Should().Be(typeof(DirectBufferPool)); | ||
poolInfo.Free.Should().Be(poolInfo.TotalSize); | ||
poolInfo.Used.Should().Be(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -131,6 +149,17 @@ public DirectBufferPool(int bufferSize, int buffersPerSegment, int initialSegmen | |||
} | |||
} | |||
|
|||
public BufferPoolInfo Diagnostics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea
if (buf.Count == _bufferSize && _segments.Contains(buf.Array)) | ||
_buffers.Push(buf); | ||
if (buf.Count != _bufferSize || !_segments.Contains(buf.Array)) | ||
throw new Exception("Wrong ArraySegment<byte> was released to DirectBufferPool"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would it only happen during development?
} | ||
|
||
internal class PreallocatedSocketEventAgrsPool : ISocketEventArgsPool | ||
{ | ||
private readonly IBufferPool _bufferPool; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment saying this and explaining why
|
||
protected SocketCompleted(SocketAsyncEventArgs eventArgs) | ||
{ | ||
Data = ByteString.CopyFrom(eventArgs.Buffer, eventArgs.Offset, eventArgs.BytesTransferred); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we make a copy of the data, so there's no problems if the SocketAsyncEventArgs
get processed and the buffer pool is released.
I mean its there so we can catch it while we're developing Akka, I think its better if I change that to an error log instead. |
You can use 'Debug.Assert' for that |
…use Assert.Debug instead of throwing.
…t into IO_Fix_UDP_memory_leak
Done with the changes |
Fix UDP memory leak problem because the rented byte buffer memory never got released when SocketAsyncEventArgs got lost.
Closes #5325
Udp
,UdpListener
,UdpConnect
, andUdpConnection
class. Required data are copied into the response messages instead of embeddingSocketAsyncEventArgs
inside theReceived
message.SocketAsyncEventArgs
will either be released immediately ifSocket.ReceiveAsync
returned false or released when theOnComplete
event fires.SocketAsyncEventArgs
pooling functionality, cachingSocketAsyncEventArgs
is causing a lot more grief than its worth, we would have to catch all the edge cases where it can fail.SocketAsyncEventArgs
is not a simple struct that can be cleaned and reused, it actually have internal states that can cause a lot of problems when reused.