-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
keep MsQuicConnection alive when streams are pending #52800
Changes from 5 commits
b24b9d5
93dbb7a
0012ed2
5572326
f268e01
188cc2b
cd40a5b
ade7ec5
b161cf5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -29,8 +29,7 @@ internal sealed class MsQuicConnection : QuicConnectionProvider | |||||
private readonly SafeMsQuicConfigurationHandle? _configuration; | ||||||
|
||||||
private readonly State _state = new State(); | ||||||
private GCHandle _stateHandle; | ||||||
private bool _disposed; | ||||||
private int _disposed; | ||||||
|
||||||
private IPEndPoint? _localEndPoint; | ||||||
private readonly EndPoint _remoteEndPoint; | ||||||
|
@@ -43,6 +42,7 @@ internal sealed class MsQuicConnection : QuicConnectionProvider | |||||
internal sealed class State | ||||||
{ | ||||||
public SafeMsQuicConnectionHandle Handle = null!; // set inside of MsQuicConnection ctor. | ||||||
public GCHandle StateGCHandle; | ||||||
|
||||||
// These exists to prevent GC of the MsQuicConnection in the middle of an async op (Connect or Shutdown). | ||||||
public MsQuicConnection? Connection; | ||||||
|
@@ -53,6 +53,8 @@ internal sealed class State | |||||
|
||||||
public bool Connected; | ||||||
public long AbortErrorCode = -1; | ||||||
public int StreamCount; | ||||||
wfurt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
private bool _closing; | ||||||
|
||||||
// Queue for accepted streams. | ||||||
// Backlog limit is managed by MsQuic so it can be unbounded here. | ||||||
|
@@ -61,30 +63,79 @@ internal sealed class State | |||||
SingleReader = true, | ||||||
SingleWriter = true, | ||||||
}); | ||||||
|
||||||
public void RemoveStream(MsQuicStream stream) | ||||||
{ | ||||||
bool releaseHandles; | ||||||
lock (this) | ||||||
{ | ||||||
StreamCount--; | ||||||
wfurt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
releaseHandles = _closing && StreamCount == 0; | ||||||
} | ||||||
|
||||||
if (releaseHandles) | ||||||
{ | ||||||
Handle?.Dispose(); | ||||||
if (StateGCHandle.IsAllocated) StateGCHandle.Free(); | ||||||
wfurt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
} | ||||||
|
||||||
public bool TryQueueNewStream(SafeMsQuicStreamHandle streamHandle, QUIC_STREAM_OPEN_FLAGS flags) | ||||||
{ | ||||||
var stream = new MsQuicStream(this, streamHandle, flags); | ||||||
if (AcceptQueue.Writer.TryWrite(stream)) | ||||||
{ | ||||||
return true; | ||||||
} | ||||||
else | ||||||
{ | ||||||
stream.Dispose(); | ||||||
return false; | ||||||
} | ||||||
} | ||||||
|
||||||
public bool TryAddStream(MsQuicStream stream) | ||||||
{ | ||||||
lock (this) | ||||||
{ | ||||||
if (_closing) | ||||||
{ | ||||||
return false; | ||||||
} | ||||||
|
||||||
StreamCount++; | ||||||
return true; | ||||||
} | ||||||
} | ||||||
|
||||||
// This is called under lock from connection dispose | ||||||
public void SetClosing() | ||||||
{ | ||||||
_closing = true; | ||||||
wfurt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
} | ||||||
|
||||||
// constructor for inbound connections | ||||||
public MsQuicConnection(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, SafeMsQuicConnectionHandle handle) | ||||||
{ | ||||||
_state.Handle = handle; | ||||||
_state.StateGCHandle = GCHandle.Alloc(_state); | ||||||
_state.Connected = true; | ||||||
_localEndPoint = localEndPoint; | ||||||
_remoteEndPoint = remoteEndPoint; | ||||||
_remoteCertificateRequired = false; | ||||||
_isServer = true; | ||||||
|
||||||
_stateHandle = GCHandle.Alloc(_state); | ||||||
|
||||||
try | ||||||
{ | ||||||
MsQuicApi.Api.SetCallbackHandlerDelegate( | ||||||
_state.Handle, | ||||||
s_connectionDelegate, | ||||||
GCHandle.ToIntPtr(_stateHandle)); | ||||||
GCHandle.ToIntPtr(_state.StateGCHandle)); | ||||||
} | ||||||
catch | ||||||
{ | ||||||
_stateHandle.Free(); | ||||||
_state.StateGCHandle.Free(); | ||||||
throw; | ||||||
} | ||||||
|
||||||
|
@@ -107,7 +158,7 @@ public MsQuicConnection(QuicClientConnectionOptions options) | |||||
_remoteCertificateValidationCallback = options.ClientAuthenticationOptions.RemoteCertificateValidationCallback; | ||||||
} | ||||||
|
||||||
_stateHandle = GCHandle.Alloc(_state); | ||||||
_state.StateGCHandle = GCHandle.Alloc(_state); | ||||||
try | ||||||
{ | ||||||
// this handle is ref counted by MsQuic, so safe to dispose here. | ||||||
|
@@ -116,14 +167,14 @@ public MsQuicConnection(QuicClientConnectionOptions options) | |||||
uint status = MsQuicApi.Api.ConnectionOpenDelegate( | ||||||
MsQuicApi.Api.Registration, | ||||||
s_connectionDelegate, | ||||||
GCHandle.ToIntPtr(_stateHandle), | ||||||
GCHandle.ToIntPtr(_state.StateGCHandle), | ||||||
out _state.Handle); | ||||||
|
||||||
QuicExceptionHelpers.ThrowIfFailed(status, "Could not open the connection."); | ||||||
} | ||||||
catch | ||||||
{ | ||||||
_stateHandle.Free(); | ||||||
_state.StateGCHandle.Free(); | ||||||
throw; | ||||||
} | ||||||
|
||||||
|
@@ -198,9 +249,13 @@ private static uint HandleEventShutdownComplete(State state, ref ConnectionEvent | |||||
private static uint HandleEventNewStream(State state, ref ConnectionEvent connectionEvent) | ||||||
{ | ||||||
var streamHandle = new SafeMsQuicStreamHandle(connectionEvent.Data.PeerStreamStarted.Stream); | ||||||
var stream = new MsQuicStream(state, streamHandle, connectionEvent.Data.PeerStreamStarted.Flags); | ||||||
if (!state.TryQueueNewStream(streamHandle, connectionEvent.Data.PeerStreamStarted.Flags)) | ||||||
{ | ||||||
// This will call StreamCloseDelegate and free the stream. | ||||||
// We will return Success to the MsQuic to prevent double free. | ||||||
streamHandle.Dispose(); | ||||||
} | ||||||
|
||||||
state.AcceptQueue.Writer.TryWrite(stream); | ||||||
return MsQuicStatusCodes.Success; | ||||||
} | ||||||
|
||||||
|
@@ -488,16 +543,48 @@ public override void Dispose() | |||||
Dispose(false); | ||||||
} | ||||||
|
||||||
private async Task FlushAcceptQueue() | ||||||
{ | ||||||
try { | ||||||
// Writer may or may not be completed. | ||||||
_state.AcceptQueue.Writer.Complete(); | ||||||
} catch { }; | ||||||
wfurt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
await foreach (MsQuicStream item in _state.AcceptQueue.Reader.ReadAllAsync()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
{ | ||||||
item.Dispose(); | ||||||
} | ||||||
} | ||||||
|
||||||
private void Dispose(bool disposing) | ||||||
{ | ||||||
if (_disposed) | ||||||
int disposed = Interlocked.Exchange(ref _disposed, 1); | ||||||
if (disposed == 1) | ||||||
{ | ||||||
return; | ||||||
} | ||||||
wfurt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
_state?.Handle?.Dispose(); | ||||||
if (_stateHandle.IsAllocated) _stateHandle.Free(); | ||||||
_disposed = true; | ||||||
bool releaseHandles = false; | ||||||
lock (_state) | ||||||
{ | ||||||
_state.Connection = null; | ||||||
if (_state.StreamCount == 0) | ||||||
{ | ||||||
releaseHandles = true; | ||||||
} | ||||||
else | ||||||
{ | ||||||
// We have pending streams so we need to defer cleanup until last one is gone. | ||||||
_state.SetClosing(); | ||||||
} | ||||||
} | ||||||
|
||||||
FlushAcceptQueue().GetAwaiter().GetResult(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Connections are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what you suggest. We still need to flush for synchronous Dispose(). Can we do what ever needs to be done as follow-up? I would like to get this this in to get verifications on the crashes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's fine. |
||||||
if (releaseHandles) | ||||||
{ | ||||||
_state!.Handle?.Dispose(); | ||||||
if (_state.StateGCHandle.IsAllocated) _state.StateGCHandle.Free(); | ||||||
} | ||||||
} | ||||||
|
||||||
// TODO: this appears abortive and will cause prior successfully shutdown and closed streams to drop data. | ||||||
|
@@ -511,7 +598,7 @@ internal override ValueTask CloseAsync(long errorCode, CancellationToken cancell | |||||
|
||||||
private void ThrowIfDisposed() | ||||||
{ | ||||||
if (_disposed) | ||||||
if (_disposed == 1) | ||||||
wfurt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
{ | ||||||
throw new ObjectDisposedException(nameof(MsQuicStream)); | ||||||
} | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand why was the
StateGCHandle
moved insideState
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the connection is gone and state is kept alive because of the streams, we still want to lock state in place because we handed pointer to MsQuic. When we release all the streams, the original location is not accessible as we don't have link back to connection (and it may be gone) I did the move (sort of) in previous round using IntPtr. @CarnaViire and @jkotas suggested to preserve the type so I end up with this.