-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async stream initializers order matches existing #415
Conversation
streamInitializerProductContinuation: continuation, | ||
inboundStreamInitializer: { channel in | ||
inboundStreamInitializer(channel).map { output in | ||
continuation.yield(output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this ordering is correct here. The problem is that we might yield the new child channel to the continuation here but the activation might still fail for reasons inside the child channel.
What I have done in other places was to create a promise that we fulfill inside here and combine that with the promise of child channel creation. Once both promises are successful we can yield the output to the continuation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this feedback Franz. I went away and built something which I think fulfills these requirements. There's a slight wrinkle in the H/2 case which meant I had to start dealing with Any
in a few places so that the generic Output
type didn't leak into making the HTTP2Handler
or StreamMultiplexer
types generic.
d275dcc
to
69f8db9
Compare
The API breakage the checker is picking up is under SPI and I believe it is fine. |
@@ -243,7 +243,6 @@ extension NIOHTTP2Handler { | |||
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get() | |||
} | |||
|
|||
|
|||
/// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am surprised to still see this method. I thought we got rid of any NIOAsyncChannel
APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So am I but I didn't think tackling the issue of its continued existence was in-scope for this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah sure, we should just make sure to remove this API here as well. I am fine if we do it in a follow up but maybe you could quickly check if this is just internal API right now and maybe RIP it out here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about it, I just removed the API and fixed up the test
let channelPromise: EventLoopPromise<Channel>? | ||
if let promise = promise { | ||
channelPromise = self.baseChannel.eventLoop.makePromise(of: Channel.self) | ||
channelPromise!.completeWith(promise.futureResult.map { value in value as! Channel }) | ||
} else { | ||
channelPromise = nil | ||
} | ||
self.baseChannel.configure(initializer: initializer, userPromise: channelPromise) | ||
case .returnsAny(let initializer): | ||
self.baseChannel.configure(initializer: initializer, userPromise: promise) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confused here but might just lack the context. Are we using both cases because one is the inbound stream and the other is creating a new outbound stream case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't follow the question. Both cases of what? I'll take a punt at clarifying some things.
For initializer type:
.includesStreamID
is the legacy case.excludesStreamID
is the non-async case.returnsAny
is the async case
We optionally remap the promise type if we have one from Any
to Channel
but only if we have one. In the past we didn't supply a promise on the inbound path so if we aren't in async world then we don't supply one now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing I am confused then is why did something in the implementation for .excludesStreamID
change now. If this is the non-async case can we actually hit this with a non-nil promise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point. No, we can't hit that in the non-async case, I'll remove it.
Motivation: When adding in the SPI methods for exposing async sequences of HTTP/2 streams we moved the stream initialization to a subtly different location so that it was easier to exfiltrate the outputs of those initialization functions (such as protocol negotiation outputs). In some cases this broke ordering expectations. Modifications: Yield the (optionally wrapped) channels as initializer outputs to the async stream Result: Changes only exist within SPI. Async inbound stream channel initialization now matches previous behavior.
This reverts commit 53be8d4.
Motivation: When adding in the SPI methods for exposing async sequences of HTTP/2 streams we moved the stream initialization to a subtly different location so that it was easier to exfiltrate the outputs of those initialization functions (such as protocol negotiation outputs). In some cases this broke ordering expectations. Modifications: Yield the (optionally wrapped) channels as initializer outputs to the async stream when the initialization and activation steps succeed. Result: Changes only exist within SPI. Async inbound stream channel initialization now matches previous behavior.
and other review comments
554a6ff
to
e9d703b
Compare
@@ -989,9 +1004,13 @@ extension NIOHTTP2Handler { | |||
#if swift(>=5.7) | |||
/// The type of all `inboundStreamInitializer` callbacks which do not need to return data. | |||
public typealias StreamInitializer = NIOChannelInitializer | |||
/// The type of NIO Channel initializer callbacks which need to return untyped data. | |||
internal typealias StreamInitializerWithAnyOutput = @Sendable (Channel) -> EventLoopFuture<Any> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to think ahead with strict concurrency checking: should the return be EventLoopFuture<any Sendable>
rather than just Any
? Otherwise the closure cannot be Sendable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense to me
// Note: Firing the initial (header) frame before calling `HTTP2StreamChannel.configureInboundStream(initializer:)` | ||
// is crucial to preserve frame order, since the initialization process might trigger another read on the parent | ||
// channel which in turn might cause further frames to be processed synchronously. | ||
channel.receiveInboundFrame(frame) | ||
channel.configureInboundStream(initializer: self.inboundStreamStateInitializer) | ||
|
||
let initializerProductPromise = self.streamChannelContinuation == nil ? nil : self.channel.eventLoop.makePromise(of: Any.self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense to arrange this so that you set up the promise and callback before configuring; you can reduce branching and avoid the '!':
let initializerProductPromise: EventLoopPromise<Any>?
if let continuation = self.streamChannelContinuation {
let promise = self.channel.eventLoop.makePromise(of: Any.self)
promise.futureResult.whenSuccess { ... }
initializerProductPromise = promise
} else {
initializerProductPromise = nil
}
channel.configureInboundStream(...)
let anyPromise: EventLoopPromise<Any>? | ||
if let promise = promise { | ||
anyPromise = channel.baseChannel.eventLoop.makePromise(of: Any.self) | ||
promise.completeWith(anyPromise!.futureResult.map { value in value as! Output }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This definitely needs a comment!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can save an allocation here by avoiding a future and completing promise
in the whenComplete
block of anyPromise
:
anyPromise.futureResult.whenComplete {
switch $0 {
// ...
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM! If we just fix up the last comments from @glbrntt
@@ -989,9 +1004,13 @@ extension NIOHTTP2Handler { | |||
#if swift(>=5.7) | |||
/// The type of all `inboundStreamInitializer` callbacks which do not need to return data. | |||
public typealias StreamInitializer = NIOChannelInitializer | |||
/// The type of NIO Channel initializer callbacks which need to return untyped data. | |||
internal typealias StreamInitializerWithAnyOutput = @Sendable (Channel) -> EventLoopFuture<Any> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense to me
e82124e
to
15a6560
Compare
15a6560
to
c43f8fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of minor nits left otherwise LGTM!
@@ -439,6 +439,7 @@ extension Channel { | |||
/// - configuration: The settings that will be used when establishing the connection and new streams. | |||
/// - position: The position in the pipeline into which to insert this handler. | |||
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. | |||
/// It must output the stream `Channel` (either directly or wrapped in some form) to allow users to iterate over inbound streams. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment confuses me on all the methods here. The users can return whatever they want inside the initializer right. They might multiplex another HTTP2
connection side a single stream and the return will be a NIOHTTP2Handler.AsyncStreamMultiplexer<NIOHTTP2Handler.AsyncStreamMultiplexer<Output>>>
.
Additionally, we should make Output: Sendable
here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, this isn't correct, I was attempting to give guidance on the intent of the Output
type but it's too narrow a description. I've reworded this to reflect the broader use.
@@ -537,6 +538,7 @@ extension Channel { | |||
/// - http2ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol | |||
/// is HTTP/2 to configure the connection channel. | |||
/// - http2InboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. | |||
/// It must output the stream `Channel` (either directly or wrapped in some form) to allow users to iterate over inbound streams. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should require all generic types to be Sendable
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One nit, looks good otherwise.
@@ -239,7 +239,7 @@ extension NIOHTTP2Handler { | |||
} | |||
|
|||
/// Create a stream channel initialized with the provided closure | |||
public func createStreamChannel<OutboundStreamOutput>(_ initializer: @escaping NIOChannelInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput { | |||
public func createStreamChannel<OutboundStreamOutput: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think OutboundStreamOutput
is unnecessarily verbose here: Output
is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Output
should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally still think we should call this method createOutboundStream
so that it aligns with the inboundStreamInitializer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name aligns with the equivalent method on the non-async multiplexer, I think that alignment is more important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally don't think that is more important because this here is the chance for a new name. I recently saw somebody giving feedback on exactly this API that they are confused what initializer runs when.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think createOutboundStream
is misleading: it sounds like it's a unidirectional stream to me, which it isn't. (FWIW I have the same issue with our inbound stream initializers.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks @rnro
These are all SPI changes so are safe to ignore:
|
Motivation:
When adding in the SPI methods for exposing async sequences of HTTP/2 streams we moved the stream initialization to a subtly different location so that it was easier to exfiltrate the outputs of those initialization functions (such as protocol negotiation outputs).
In some cases this broke ordering expectations.
Modifications:
Yield the (optionally wrapped) channels as initializer outputs to the async stream when the initialization and activation steps succeed.
Result:
Changes only exist within SPI. Async inbound stream channel initialization now matches previous behavior.