Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP/2 Async API #424

Merged
merged 7 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,35 @@ extension InlineStreamMultiplexer {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}

extension NIOHTTP2Handler {
/// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and
/// provides access to inbound HTTP/2 streams.
///
/// In general in NIO applications it is helpful to consider each HTTP/2 stream as an
/// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a
/// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate
/// on ``HTTP2Frame/FramePayload`` objects as their base communication
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer`
/// and `IOData`.
///
/// Outbound stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rephrase that to talk about opening new streams to a remote and uptime the streamStateInitializer

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

/// Create a stream channel initialized with the provided closure
public func openStream<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document the initializer closure here please

return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}
}
}
12 changes: 12 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1180,4 +1180,16 @@ extension NIOHTTP2Handler {
throw NIOHTTP2Errors.missingMultiplexer()
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
case .some(.legacy), .none:
throw NIOHTTP2Errors.missingMultiplexer()
}
}
}
120 changes: 120 additions & 0 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,123 @@ internal protocol AnyContinuation {
func finish()
func finish(throwing error: Error)
}


/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as a generic `AsyncSequence`.
/// They make use of generics to allow for wrapping the stream `Channel`s, for example as `NIOAsyncChannel`s or protocol negotiation objects.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we drop inbound here?

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we nest this type inside the AsyncStreamMultiplexer ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here but I'm not convinced. I don't know if this type has to be tightly coupled to the async multiplexer and since this is public API maybe we should keep our options open?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough!

public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = Output

private var iterator: AsyncThrowingStream<Output, Error>.AsyncIterator

init(_ iterator: AsyncThrowingStream<Output, Error>.AsyncIterator) {
self.iterator = iterator
}

public mutating func next() async throws -> Output? {
try await self.iterator.next()
}
}

public typealias Element = Output

private let asyncThrowingStream: AsyncThrowingStream<Output, Error>

private init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
self.asyncThrowingStream = asyncThrowingStream
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self.asyncThrowingStream.makeAsyncIterator())
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels {
/// `Continuation` is a wrapper for a generic `AsyncThrowingStream` to which inbound HTTP2 stream channels are yielded..
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct Continuation: AnyContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation

internal init(
continuation: AsyncThrowingStream<Output, Error>.Continuation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to use wrapping as the label in the init for wrapper types. It makes it a little more obvious that the type really is just a simple wrapper.

) {
self.continuation = continuation
}

/// `yield` takes a channel as outputted by the stream initializer and yields the wrapped `AsyncThrowingStream`.
///
/// It takes channels as as `Any` type to allow wrapping by the stream initializer.
func yield(any: Any) {
let yieldResult = self.continuation.yield(any as! Output)
switch yieldResult {
case .enqueued:
break // success, nothing to do
case .dropped:
preconditionFailure("Attempted to yield when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
case .terminated:
preconditionFailure("Attempted to yield to AsyncThrowingStream in terminated state.")
default:
preconditionFailure("Attempt to yield to AsyncThrowingStream failed for unhandled reason.")
}
}

/// `finish` marks the continuation as finished.
func finish() {
self.continuation.finish()
}

/// `finish` marks the continuation as finished with the supplied error.
func finish(throwing error: Error) {
self.continuation.finish(throwing: error)
}
}


/// `initialize` creates a new `Continuation` object and returns it along with its backing `AsyncThrowingStream`.
/// The `StreamChannelContinuation` provides access to the inbound HTTP2 stream channels.
///
/// - Parameters:
/// - inboundStreamInititializer: A closure which initializes the newly-created inbound stream channel and returns a generic.
/// The returned type corresponds to the output of the channel once the operations in the initializer have been performed.
/// For example an `inboundStreamInititializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would
/// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is
/// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`.
static func initialize(inboundStreamInitializerOutput: Output.Type = Output.self) -> (NIOHTTP2InboundStreamChannels<Output>, Continuation) {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self)
return (.init(stream), Continuation(continuation: continuation))
}
}

#if swift(>=5.7)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are 5.7+ only right? So we can drop those guards

// This doesn't compile on 5.6 but the omission of Sendable is sufficient in any case
@available(*, unavailable)
extension NIOHTTP2InboundStreamChannels.AsyncIterator: Sendable {}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: Sendable where Output: Sendable {}
#else
// This wasn't marked as sendable in 5.6 however it should be fine
// https://forums.swift.org/t/so-is-asyncstream-sendable-or-not/53148/2
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: @unchecked Sendable where Output: Sendable {}
#endif


#if swift(<5.9)
// this should be available in the std lib from 5.9 onwards
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension AsyncThrowingStream {
public static func makeStream(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be public right?

of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) where Failure == Error {
var continuation: AsyncThrowingStream<Element, Failure>.Continuation!
let stream = AsyncThrowingStream<Element, Failure>(bufferingPolicy: limit) { continuation = $0 }
return (stream: stream, continuation: continuation!)
}
}
#endif
Loading