Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP/2 Async API #424

Merged
merged 7 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let package = Package(
.library(name: "NIOHTTP2", targets: ["NIOHTTP2"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.58.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.60.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
],
Expand Down
39 changes: 39 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,42 @@ extension InlineStreamMultiplexer {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}

extension NIOHTTP2Handler {
/// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and
/// provides access to inbound HTTP/2 streams.
///
/// In general in NIO applications it is helpful to consider each HTTP/2 stream as an
/// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a
/// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate
/// on ``HTTP2Frame/FramePayload`` objects as their base communication
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer`
/// and `IOData`.
///
/// Inbound (remotely-initiated) streams are accessible via the ``inbound`` property, having been initialized and
/// returned as the `InboundStreamOutput` type.
///
/// You can open a stream by calling ``openStream(_:)``. Locally-initiated stream channel objects are initialized upon creation using the supplied `initializer` which returns a type
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2AsyncSequence<InboundStreamOutput>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2AsyncSequence<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}


/// Create a stream channel initialized with the provided closure
/// - Parameter initializer: A closure that will be called upon the created stream which is responsible for
/// initializing the stream's `Channel`.
/// - Returns: The result of the `initializer`.
public func openStream<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document the initializer closure here please

return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}
}
}
12 changes: 12 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1180,4 +1180,16 @@ extension NIOHTTP2Handler {
throw NIOHTTP2Errors.missingMultiplexer()
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2AsyncSequence<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
case .some(.legacy), .none:
throw NIOHTTP2Errors.missingMultiplexer()
}
}
}
114 changes: 112 additions & 2 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,120 @@ extension HTTP2CommonInboundStreamMultiplexer {
}
}

/// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held
/// by the `HTTP2ChannelHandler` without causing it to become generic itself.
/// `AnyContinuation` is used to generic async-sequence-like objects to deal with the generic element types without
/// the holding type becoming generic itself.
///
/// This is useful in in the case of the `HTTP2ChannelHandler` which must deal with types which hold stream initializers
/// which have a generic return type.
internal protocol AnyContinuation {
func yield(any: Any)
func finish()
func finish(throwing error: Error)
}


/// `NIOHTTP2AsyncSequence` is an implementation of the `AsyncSequence` protocol which allows iteration over a generic
/// element type `Output`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct NIOHTTP2AsyncSequence<Output>: AsyncSequence {
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = Output

private var iterator: AsyncThrowingStream<Output, Error>.AsyncIterator

init(wrapping iterator: AsyncThrowingStream<Output, Error>.AsyncIterator) {
self.iterator = iterator
}

public mutating func next() async throws -> Output? {
try await self.iterator.next()
}
}

public typealias Element = Output

private let asyncThrowingStream: AsyncThrowingStream<Output, Error>

private init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
self.asyncThrowingStream = asyncThrowingStream
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(wrapping: self.asyncThrowingStream.makeAsyncIterator())
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2AsyncSequence {
/// `Continuation` is a wrapper for a generic `AsyncThrowingStream` to which the products of the initializers of
/// inbound (remotely-initiated) HTTP/2 stream channels are yielded.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct Continuation: AnyContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation

internal init(wrapping continuation: AsyncThrowingStream<Output, Error>.Continuation) {
self.continuation = continuation
}

/// `yield` takes a channel as outputted by the stream initializer and yields the wrapped `AsyncThrowingStream`.
///
/// It takes channels as as `Any` type to allow wrapping by the stream initializer.
func yield(any: Any) {
let yieldResult = self.continuation.yield(any as! Output)
switch yieldResult {
case .enqueued:
break // success, nothing to do
case .dropped:
preconditionFailure("Attempted to yield when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
case .terminated:
preconditionFailure("Attempted to yield to AsyncThrowingStream in terminated state.")
default:
preconditionFailure("Attempt to yield to AsyncThrowingStream failed for unhandled reason.")
}
}

/// `finish` marks the continuation as finished.
func finish() {
self.continuation.finish()
}

/// `finish` marks the continuation as finished with the supplied error.
func finish(throwing error: Error) {
self.continuation.finish(throwing: error)
}
}


/// `initialize` creates a new `Continuation` object and returns it along with its backing ``NIOHTTP2AsyncSequence``.
/// The `Continuation` provides the ability to yield to the backing .``NIOHTTP2AsyncSequence``.
///
/// - Parameters:
/// - inboundStreamInitializerOutput: The type which is returned by the initializer operating on the inbound
/// (remotely-initiated) HTTP/2 streams.
static func initialize(inboundStreamInitializerOutput: Output.Type = Output.self) -> (NIOHTTP2AsyncSequence<Output>, Continuation) {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self)
return (.init(stream), Continuation(wrapping: continuation))
}
}

@available(*, unavailable)
extension NIOHTTP2AsyncSequence.AsyncIterator: Sendable {}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2AsyncSequence: Sendable where Output: Sendable {}

#if swift(<5.9)
// this should be available in the std lib from 5.9 onwards
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension AsyncThrowingStream {
static func makeStream(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) where Failure == Error {
var continuation: AsyncThrowingStream<Element, Failure>.Continuation!
let stream = AsyncThrowingStream<Element, Failure>(bufferingPolicy: limit) { continuation = $0 }
return (stream: stream, continuation: continuation!)
}
}
#endif
Loading