diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift index 98b35f12..5b047fcb 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift @@ -140,15 +140,15 @@ extension InlineStreamMultiplexer { } extension InlineStreamMultiplexer { - internal func createStreamChannel(promise: EventLoopPromise?, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture) { + internal func createStreamChannel(promise: EventLoopPromise?, _ streamStateInitializer: @escaping NIOChannelInitializer) { self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), promise: promise, streamStateInitializer) } - internal func createStreamChannel(_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture) -> EventLoopFuture { + internal func createStreamChannel(_ streamStateInitializer: @escaping NIOChannelInitializer) -> EventLoopFuture { self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer) } - internal func createStreamChannel(_ initializer: @escaping NIOChannelInitializerWithOutput) -> EventLoopFuture { + internal func createStreamChannel(_ initializer: @escaping NIOChannelInitializerWithOutput) -> EventLoopFuture { self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), initializer) } } @@ -207,7 +207,7 @@ extension NIOHTTP2Handler { } extension InlineStreamMultiplexer { - func setChannelContinuation(_ streamChannels: any ChannelContinuation) { + func setChannelContinuation(_ streamChannels: any AnyContinuation) { self.commonStreamMultiplexer.setChannelContinuation(streamChannels) } } @@ -224,7 +224,7 @@ extension NIOHTTP2Handler { /// and `IOData`. /// /// Outbound stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type - /// `OutboundStreamOutput`. This type may be `HTTP2Frame` or changed to any other type. + /// `Output`. This type may be `HTTP2Frame` or changed to any other type. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @_spi(AsyncChannel) public struct AsyncStreamMultiplexer { @@ -232,38 +232,15 @@ extension NIOHTTP2Handler { public let inbound: NIOHTTP2InboundStreamChannels // Cannot be created by users. - internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels) { + internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels) { self.inlineStreamMultiplexer = inlineStreamMultiplexer self.inlineStreamMultiplexer.setChannelContinuation(continuation) self.inbound = inboundStreamChannels } /// Create a stream channel initialized with the provided closure - public func createStreamChannel(_ initializer: @escaping NIOChannelInitializerWithOutput) async throws -> OutboundStreamOutput { + public func createStreamChannel(_ initializer: @escaping NIOChannelInitializerWithOutput) async throws -> Output { return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get() } - - - /// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`. - /// - /// - Parameters: - /// - configuration: Configuration for the ``NIOAsyncChannel`` wrapping the HTTP/2 stream channel. - /// - initializer: A callback that will be invoked to allow you to configure the - /// `ChannelPipeline` for the newly created channel. - @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - @_spi(AsyncChannel) - public func createStreamChannel( - configuration: NIOAsyncChannel.Configuration = .init(), - initializer: @escaping NIOChannelInitializer - ) async throws -> NIOAsyncChannel { - return try await self.createStreamChannel { channel in - initializer(channel).flatMapThrowing { _ in - return try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: configuration - ) - } - } - } } } diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift index 648c0433..7ddf9df6 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift @@ -120,6 +120,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { private enum InboundStreamMultiplexerState { case uninitializedLegacy case uninitializedInline(StreamConfiguration, StreamInitializer, NIOHTTP2StreamDelegate?) + case uninitializedAsync(StreamConfiguration, StreamInitializerWithAnyOutput, NIOHTTP2StreamDelegate?) case initialized(InboundStreamMultiplexer) case deinitialized @@ -127,7 +128,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { switch self { case .initialized(let inboundStreamMultiplexer): return inboundStreamMultiplexer - case .uninitializedLegacy, .uninitializedInline, .deinitialized: + case .uninitializedLegacy, .uninitializedInline, .uninitializedAsync, .deinitialized: return nil } } @@ -153,6 +154,20 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { ) )) + case .uninitializedAsync(let streamConfiguration, let inboundStreamInitializer, let streamDelegate): + self = .initialized(.inline( + InlineStreamMultiplexer( + context: context, + outboundView: .init(http2Handler: http2Handler), + mode: mode, + inboundStreamStateInitializer: .returnsAny(inboundStreamInitializer), + targetWindowSize: max(0, min(streamConfiguration.targetWindowSize, Int(Int32.max))), + streamChannelOutboundBytesHighWatermark: streamConfiguration.outboundBufferSizeHighWatermark, + streamChannelOutboundBytesLowWatermark: streamConfiguration.outboundBufferSizeLowWatermark, + streamDelegate: streamDelegate + ) + )) + case .initialized: break //no-op } @@ -989,9 +1004,13 @@ extension NIOHTTP2Handler { #if swift(>=5.7) /// The type of all `inboundStreamInitializer` callbacks which do not need to return data. public typealias StreamInitializer = NIOChannelInitializer + /// The type of NIO Channel initializer callbacks which need to return untyped data. + internal typealias StreamInitializerWithAnyOutput = @Sendable (Channel) -> EventLoopFuture #else /// The type of all `inboundStreamInitializer` callbacks which need to return data. public typealias StreamInitializer = NIOChannelInitializer + /// The type of NIO Channel initializer callbacks which need to return untyped data. + internal typealias StreamInitializerWithAnyOutput = (Channel) -> EventLoopFuture #endif /// Creates a new ``NIOHTTP2Handler`` with a local multiplexer. (i.e. using @@ -1014,17 +1033,39 @@ extension NIOHTTP2Handler { streamDelegate: NIOHTTP2StreamDelegate? = nil, inboundStreamInitializer: @escaping StreamInitializer ) { - self.init(mode: mode, - eventLoop: eventLoop, - initialSettings: connectionConfiguration.initialSettings, - headerBlockValidation: connectionConfiguration.headerBlockValidation, - contentLengthValidation: connectionConfiguration.contentLengthValidation, - maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames, - maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames + self.init( + mode: mode, + eventLoop: eventLoop, + initialSettings: connectionConfiguration.initialSettings, + headerBlockValidation: connectionConfiguration.headerBlockValidation, + contentLengthValidation: connectionConfiguration.contentLengthValidation, + maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames, + maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames ) + self.inboundStreamMultiplexerState = .uninitializedInline(streamConfiguration, inboundStreamInitializer, streamDelegate) } + internal convenience init( + mode: ParserMode, + eventLoop: EventLoop, + connectionConfiguration: ConnectionConfiguration = .init(), + streamConfiguration: StreamConfiguration = .init(), + streamDelegate: NIOHTTP2StreamDelegate? = nil, + inboundStreamInitializerWithAnyOutput: @escaping StreamInitializerWithAnyOutput + ) { + self.init( + mode: mode, + eventLoop: eventLoop, + initialSettings: connectionConfiguration.initialSettings, + headerBlockValidation: connectionConfiguration.headerBlockValidation, + contentLengthValidation: connectionConfiguration.contentLengthValidation, + maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames, + maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames + ) + self.inboundStreamMultiplexerState = .uninitializedAsync(streamConfiguration, inboundStreamInitializerWithAnyOutput, streamDelegate) + } + /// Connection-level configuration. /// /// The settings that will be used when establishing the connection. These will be sent to the peer as part of the @@ -1101,7 +1142,7 @@ extension NIOHTTP2Handler { } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - internal func syncAsyncStreamMultiplexer(continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels) throws -> AsyncStreamMultiplexer { + internal func syncAsyncStreamMultiplexer(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels) throws -> AsyncStreamMultiplexer { self.eventLoop!.preconditionInEventLoop() switch self.inboundStreamMultiplexer { diff --git a/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift index 2422bac3..47e11a05 100644 --- a/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift @@ -38,7 +38,7 @@ internal class HTTP2CommonInboundStreamMultiplexer { private var isReading = false private var flushPending = false - var streamChannelContinuation: (any ChannelContinuation)? + var streamChannelContinuation: (any AnyContinuation)? init( mode: NIOHTTP2Handler.ParserMode, @@ -103,17 +103,24 @@ extension HTTP2CommonInboundStreamMultiplexer { self.streams[streamID] = channel - // If we have an async sequence of inbound stream channels yield the channel to it - // This also implicitly performs the stream initialization step. - // Note that in this case the API is constructed such that `self.inboundStreamStateInitializer` - // does no actual work. - self.streamChannelContinuation?.yield(channel: channel.baseChannel) - // Note: Firing the initial (header) frame before calling `HTTP2StreamChannel.configureInboundStream(initializer:)` // is crucial to preserve frame order, since the initialization process might trigger another read on the parent // channel which in turn might cause further frames to be processed synchronously. channel.receiveInboundFrame(frame) - channel.configureInboundStream(initializer: self.inboundStreamStateInitializer) + + // Configure the inbound stream. + // If we have an async sequence of inbound stream channels yield the channel to it + // but only once we are sure initialization and activation succeed + if let streamChannelContinuation = self.streamChannelContinuation { + let promise = self.channel.eventLoop.makePromise(of: Any.self) + promise.futureResult.whenSuccess { value in + streamChannelContinuation.yield(any: value) + } + + channel.configureInboundStream(initializer: self.inboundStreamStateInitializer, promise: promise) + } else { + channel.configureInboundStream(initializer: self.inboundStreamStateInitializer) + } if !channel.inList { self.didReadChannels.append(channel) @@ -286,7 +293,7 @@ extension HTTP2CommonInboundStreamMultiplexer { } extension HTTP2CommonInboundStreamMultiplexer { - internal func _createStreamChannel( + internal func _createStreamChannel( _ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, _ promise: EventLoopPromise?, _ streamStateInitializer: @escaping NIOChannelInitializerWithOutput @@ -304,10 +311,32 @@ extension HTTP2CommonInboundStreamMultiplexer { inboundStreamStateInitializer: .excludesStreamID(nil) ) self.pendingStreams[channel.channelID] = channel - channel.configure(initializer: streamStateInitializer, userPromise: promise) + + let anyInitializer: NIOChannelInitializerWithOutput = { channel in + streamStateInitializer(channel).map { return $0 } + } + + let anyPromise: EventLoopPromise? + if let promise = promise { + anyPromise = channel.baseChannel.eventLoop.makePromise(of: Any.self) + anyPromise?.futureResult.whenComplete { result in + switch result { + case .success(let any): + // The cast through any here is unfortunate but the only way to make this work right now + // since the HTTP2ChildChannel and the multiplexer is not generic over the output of the initializer. + promise.succeed(any as! Output) + case .failure(let error): + promise.fail(error) + } + } + } else { + anyPromise = nil + } + + channel.configure(initializer: anyInitializer, userPromise: anyPromise) } - internal func createStreamChannel( + internal func createStreamChannel( multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, promise: EventLoopPromise?, _ streamStateInitializer: @escaping NIOChannelInitializerWithOutput @@ -320,7 +349,7 @@ extension HTTP2CommonInboundStreamMultiplexer { } } - internal func createStreamChannel( + internal func createStreamChannel( multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, _ streamStateInitializer: @escaping NIOChannelInitializerWithOutput ) -> EventLoopFuture { @@ -332,7 +361,7 @@ extension HTTP2CommonInboundStreamMultiplexer { internal func _createStreamChannel( _ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, _ promise: EventLoopPromise?, - _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture + _ streamStateInitializer: @escaping NIOChannelInitializer ) { let channel = MultiplexerAbstractChannel( allocator: self.channel.allocator, @@ -345,13 +374,14 @@ extension HTTP2CommonInboundStreamMultiplexer { inboundStreamStateInitializer: .excludesStreamID(nil) ) self.pendingStreams[channel.channelID] = channel + channel.configure(initializer: streamStateInitializer, userPromise: promise) } internal func createStreamChannel( multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, promise: EventLoopPromise?, - _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture + _ streamStateInitializer: @escaping NIOChannelInitializer ) { // Always create streams channels on the next event loop tick. This avoids re-entrancy // issues where handlers interposed between the two HTTP/2 handlers could create streams @@ -363,7 +393,7 @@ extension HTTP2CommonInboundStreamMultiplexer { internal func createStreamChannel( multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, - _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture) -> EventLoopFuture { + _ streamStateInitializer: @escaping NIOChannelInitializer) -> EventLoopFuture { let promise = self.channel.eventLoop.makePromise(of: Channel.self) self.createStreamChannel(multiplexer: multiplexer, promise: promise, streamStateInitializer) return promise.futureResult @@ -408,7 +438,7 @@ extension HTTP2CommonInboundStreamMultiplexer { } extension HTTP2CommonInboundStreamMultiplexer { - func setChannelContinuation(_ streamChannels: any ChannelContinuation) { + func setChannelContinuation(_ streamChannels: any AnyContinuation) { self.channel.eventLoop.assertInEventLoop() self.streamChannelContinuation = streamChannels } @@ -416,76 +446,15 @@ extension HTTP2CommonInboundStreamMultiplexer { /// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held /// by the `HTTP2ChannelHandler` without causing it to become generic itself. -internal protocol ChannelContinuation { - func yield(channel: Channel) +internal protocol AnyContinuation { + func yield(any: Any) func finish() func finish(throwing error: Error) } -/// `StreamChannelContinuation` is a wrapper for a generic `AsyncThrowingStream` which holds the inbound HTTP2 stream channels. -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -struct StreamChannelContinuation: ChannelContinuation { - private var continuation: AsyncThrowingStream.Continuation - private let inboundStreamInititializer: NIOChannelInitializerWithOutput - - private init( - continuation: AsyncThrowingStream.Continuation, - inboundStreamInititializer: @escaping NIOChannelInitializerWithOutput - ) { - self.continuation = continuation - self.inboundStreamInititializer = inboundStreamInititializer - } - - /// `initialize` creates a new `StreamChannelContinuation` object and returns it along with its backing `AsyncThrowingStream`. - /// The `StreamChannelContinuation` provides access to the inbound HTTP2 stream channels. - /// - /// - Parameters: - /// - inboundStreamInititializer: A closure which initializes the newly-created inbound stream channel and returns a generic. - /// The returned type corresponds to the output of the channel once the operations in the initializer have been performed. - /// For example an `inboundStreamInititializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would - /// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is - /// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`. - static func initialize( - with inboundStreamInititializer: @escaping NIOChannelInitializerWithOutput - ) -> (StreamChannelContinuation, NIOHTTP2InboundStreamChannels) { - let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self) - return (StreamChannelContinuation(continuation: continuation, inboundStreamInititializer: inboundStreamInititializer), NIOHTTP2InboundStreamChannels(stream)) - } - - /// `yield` takes a channel, executes the stored `streamInitializer` upon it and then yields the *derived* type to - /// the wrapped `AsyncThrowingStream`. - func yield(channel: Channel) { - channel.eventLoop.assertInEventLoop() - self.inboundStreamInititializer(channel).whenSuccess { output in - let yieldResult = self.continuation.yield(output) - switch yieldResult { - case .enqueued: - break // success, nothing to do - case .dropped: - preconditionFailure("Attempted to yield channel when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.") - case .terminated: - channel.close(mode: .all, promise: nil) - preconditionFailure("Attempted to yield channel to AsyncThrowingStream in terminated state.") - default: - channel.close(mode: .all, promise: nil) - preconditionFailure("Attempt to yield channel to AsyncThrowingStream failed for unhandled reason.") - } - } - } - - /// `finish` marks the continuation as finished. - func finish() { - self.continuation.finish() - } - - /// `finish` marks the continuation as finished with the supplied error. - func finish(throwing error: Error) { - self.continuation.finish(throwing: error) - } -} - -/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as an `AsyncSequence`. +/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as a generic `AsyncSequence`. +/// They make use of generics to allow for wrapping the stream `Channel`s, for example as `NIOAsyncChannel`s or protocol negotiation objects. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @_spi(AsyncChannel) public struct NIOHTTP2InboundStreamChannels: AsyncSequence { @@ -507,7 +476,7 @@ public struct NIOHTTP2InboundStreamChannels: AsyncSequence { private let asyncThrowingStream: AsyncThrowingStream - init(_ asyncThrowingStream: AsyncThrowingStream) { + private init(_ asyncThrowingStream: AsyncThrowingStream) { self.asyncThrowingStream = asyncThrowingStream } @@ -516,6 +485,63 @@ public struct NIOHTTP2InboundStreamChannels: AsyncSequence { } } +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension NIOHTTP2InboundStreamChannels { + /// `Continuation` is a wrapper for a generic `AsyncThrowingStream` to which inbound HTTP2 stream channels are yielded.. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + struct Continuation: AnyContinuation { + private var continuation: AsyncThrowingStream.Continuation + + internal init( + continuation: AsyncThrowingStream.Continuation + ) { + self.continuation = continuation + } + + /// `yield` takes a channel as outputted by the stream initializer and yields the wrapped `AsyncThrowingStream`. + /// + /// It takes channels as as `Any` type to allow wrapping by the stream initializer. + func yield(any: Any) { + let yieldResult = self.continuation.yield(any as! Output) + switch yieldResult { + case .enqueued: + break // success, nothing to do + case .dropped: + preconditionFailure("Attempted to yield when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.") + case .terminated: + preconditionFailure("Attempted to yield to AsyncThrowingStream in terminated state.") + default: + preconditionFailure("Attempt to yield to AsyncThrowingStream failed for unhandled reason.") + } + } + + /// `finish` marks the continuation as finished. + func finish() { + self.continuation.finish() + } + + /// `finish` marks the continuation as finished with the supplied error. + func finish(throwing error: Error) { + self.continuation.finish(throwing: error) + } + } + + + /// `initialize` creates a new `Continuation` object and returns it along with its backing `AsyncThrowingStream`. + /// The `StreamChannelContinuation` provides access to the inbound HTTP2 stream channels. + /// + /// - Parameters: + /// - inboundStreamInititializer: A closure which initializes the newly-created inbound stream channel and returns a generic. + /// The returned type corresponds to the output of the channel once the operations in the initializer have been performed. + /// For example an `inboundStreamInititializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would + /// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is + /// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`. + static func initialize(inboundStreamInitializerOutput: Output.Type = Output.self) -> (NIOHTTP2InboundStreamChannels, Continuation) { + let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self) + return (.init(stream), Continuation(continuation: continuation)) + } +} + #if swift(>=5.7) // This doesn't compile on 5.6 but the omission of Sendable is sufficient in any case @available(*, unavailable) diff --git a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift index d1e99570..e36f7f01 100644 --- a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift +++ b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift @@ -439,11 +439,12 @@ extension Channel { /// - configuration: The settings that will be used when establishing the connection and new streams. /// - position: The position in the pipeline into which to insert this handler. /// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer /// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @_spi(AsyncChannel) - public func configureAsyncHTTP2Pipeline( + public func configureAsyncHTTP2Pipeline( mode: NIOHTTP2Handler.ParserMode, configuration: NIOHTTP2Handler.Configuration = .init(), position: ChannelPipeline.Position = .last, @@ -496,7 +497,7 @@ extension Channel { /// channel has been fully mutated. /// - Returns: An `EventLoopFuture` of an `EventLoopFuture` containing the `NIOProtocolNegotiationResult` that completes when the channel /// is ready to negotiate. - internal func configureHTTP2AsyncSecureUpgrade( + internal func configureHTTP2AsyncSecureUpgrade( http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput ) -> EventLoopFuture>>> { @@ -537,12 +538,13 @@ extension Channel { /// - http2ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol /// is HTTP/2 to configure the connection channel. /// - http2InboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer /// - Returns: An `EventLoopFuture` containing a ``NIOTypedApplicationProtocolNegotiationHandler`` that completes when the channel /// is ready to negotiate. This can then be used to access the ``NIOProtocolNegotiationResult`` which may itself /// be waited on to retrieve the result of the negotiation. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @_spi(AsyncChannel) - public func configureAsyncHTTPServerPipeline( + public func configureAsyncHTTPServerPipeline( http2Configuration: NIOHTTP2Handler.Configuration = .init(), http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, @@ -589,11 +591,12 @@ extension ChannelPipeline.SynchronousOperations { /// - configuration: The settings that will be used when establishing the connection and new streams. /// - position: The position in the pipeline into which to insert this handler. /// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer /// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @_spi(AsyncChannel) - public func configureAsyncHTTP2Pipeline( + public func configureAsyncHTTP2Pipeline( mode: NIOHTTP2Handler.ParserMode, configuration: NIOHTTP2Handler.Configuration = .init(), position: ChannelPipeline.Position = .last, @@ -604,12 +607,14 @@ extension ChannelPipeline.SynchronousOperations { eventLoop: self.eventLoop, connectionConfiguration: configuration.connection, streamConfiguration: configuration.stream, - inboundStreamInitializer: { channel in channel.eventLoop.makeSucceededVoidFuture() } + inboundStreamInitializerWithAnyOutput: { channel in + inboundStreamInitializer(channel).map { return $0 } + } ) try self.addHandler(handler, position: position) - let (continuation, inboundStreamChannels) = StreamChannelContinuation.initialize(with: inboundStreamInitializer) + let (inboundStreamChannels, continuation) = NIOHTTP2InboundStreamChannels.initialize(inboundStreamInitializerOutput: Output.self) return try handler.syncAsyncStreamMultiplexer(continuation: continuation, inboundStreamChannels: inboundStreamChannels) } diff --git a/Sources/NIOHTTP2/HTTP2StreamChannel.swift b/Sources/NIOHTTP2/HTTP2StreamChannel.swift index c8ff0181..23cb71db 100644 --- a/Sources/NIOHTTP2/HTTP2StreamChannel.swift +++ b/Sources/NIOHTTP2/HTTP2StreamChannel.swift @@ -198,7 +198,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore { self.onInitializationResult(result.map { self }, promise: promise) } } else { - self.postInitializerActivate(promise: promise, output: self) + self.postInitializerActivate(output: self, promise: promise) } case .failure(let error): self.configurationFailed(withError: error, promise: promise) @@ -222,7 +222,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore { self.onInitializationResult(result.map { self }, promise: promise) } } else { - self.postInitializerActivate(promise: promise, output: self) + self.postInitializerActivate(output: self, promise: promise) } case .failure(let error): self.configurationFailed(withError: error, promise: promise) @@ -230,7 +230,10 @@ final class HTTP2StreamChannel: Channel, ChannelCore { } } - internal func configure(initializer: @escaping NIOChannelInitializerWithOutput, userPromise promise: EventLoopPromise?) { + // This variant is used in the async stream case. + // It uses `Any`s because when called from `configureInboundStream` it is passed the initializer stored on the handler + // which can't be a typed generic without changing the handler API. + internal func configure(initializer: (@escaping (Channel) -> EventLoopFuture), userPromise promise: EventLoopPromise?) { assert(self.streamDataType == .framePayload) // We need to configure this channel. This involves doing four things: // 1. Setting our autoRead state from the parent @@ -242,7 +245,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore { case .success(let autoRead): self.autoRead = autoRead initializer(self).whenComplete { result in - self.onInitializationResult(result, promise: promise) + self.onInitializationResult(result.map { $0 }, promise: promise) } case .failure(let error): self.configurationFailed(withError: error, promise: promise) @@ -250,10 +253,10 @@ final class HTTP2StreamChannel: Channel, ChannelCore { } } - func onInitializationResult(_ initializerResult: Result, promise: EventLoopPromise?) { + func onInitializationResult(_ initializerResult: Result, promise: EventLoopPromise?) { switch initializerResult { case .success(let output): - self.postInitializerActivate(promise: promise, output: output) + self.postInitializerActivate(output: output, promise: promise) case .failure(let error): self.configurationFailed(withError: error, promise: promise) } @@ -276,7 +279,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore { } /// Activates the channel if the parent channel is active and succeeds the given `promise`. - private func postInitializerActivate(promise: EventLoopPromise?, output: Output) { + private func postInitializerActivate(output: Output, promise: EventLoopPromise?) { // This force unwrap is safe as parent is assigned in the initializer, and never unassigned. // If parent is not active, we expect to receive a channelActive later. if self.parent!.isActive { @@ -288,7 +291,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore { } /// Handle any error that occurred during configuration. - private func configurationFailed(withError error: Error, promise: EventLoopPromise?) { + private func configurationFailed(withError error: Error, promise: EventLoopPromise?) { switch self.state { case .idle, .localActive, .closed: // The stream isn't open on the network, nothing to close. diff --git a/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift index 03ef7ab6..e22b0a8b 100644 --- a/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift @@ -141,7 +141,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun /// channel that is created by the remote peer. For servers, these are channels created by /// receiving a `HEADERS` frame from a client. For clients, these are channels created by /// receiving a `PUSH_PROMISE` frame from a server. To initiate a new outbound channel, use - /// ``createStreamChannel(promise:_:)-18bxc``. + /// ``createStreamChannel(promise:_:)-1jk0q``. @available(*, deprecated, renamed: "init(mode:channel:targetWindowSize:outboundBufferSizeHighWatermark:outboundBufferSizeLowWatermark:inboundStreamInitializer:)") public convenience init(mode: NIOHTTP2Handler.ParserMode, channel: Channel, targetWindowSize: Int = 65535, inboundStreamStateInitializer: ((Channel, HTTP2StreamID) -> EventLoopFuture)? = nil) { // We default to an 8kB outbound buffer size: this is a good trade off for avoiding excessive buffering while ensuring that decent @@ -170,7 +170,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun /// channel that is created by the remote peer. For servers, these are channels created by /// receiving a `HEADERS` frame from a client. For clients, these are channels created by /// receiving a `PUSH_PROMISE` frame from a server. To initiate a new outbound channel, use - /// ``createStreamChannel(promise:_:)-18bxc``. + /// ``createStreamChannel(promise:_:)-1jk0q``. public convenience init(mode: NIOHTTP2Handler.ParserMode, channel: Channel, targetWindowSize: Int = 65535, @@ -199,7 +199,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun /// channel that is created by the remote peer. For servers, these are channels created by /// receiving a `HEADERS` frame from a client. For clients, these are channels created by /// receiving a `PUSH_PROMISE` frame from a server. To initiate a new outbound channel, use - /// ``createStreamChannel(promise:_:)-18bxc``. + /// ``createStreamChannel(promise:_:)-1jk0q``. @available(*, deprecated, renamed: "init(mode:channel:targetWindowSize:outboundBufferSizeHighWatermark:outboundBufferSizeLowWatermark:inboundStreamInitializer:)") public convenience init(mode: NIOHTTP2Handler.ParserMode, channel: Channel, @@ -249,7 +249,7 @@ extension HTTP2StreamMultiplexer { /// failed if an error occurs. /// - streamStateInitializer: A callback that will be invoked to allow you to configure the /// `ChannelPipeline` for the newly created channel. - public func createStreamChannel(promise: EventLoopPromise?, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture) { + public func createStreamChannel(promise: EventLoopPromise?, _ streamStateInitializer: @escaping NIOChannelInitializer) { self.commonStreamMultiplexer.createStreamChannel(multiplexer: .legacy(LegacyOutboundStreamMultiplexer(multiplexer: self)), promise: promise, streamStateInitializer) } diff --git a/Sources/NIOHTTP2/MultiplexerAbstractChannel.swift b/Sources/NIOHTTP2/MultiplexerAbstractChannel.swift index 20cb0427..3a2de26c 100644 --- a/Sources/NIOHTTP2/MultiplexerAbstractChannel.swift +++ b/Sources/NIOHTTP2/MultiplexerAbstractChannel.swift @@ -46,7 +46,7 @@ struct MultiplexerAbstractChannel { outboundBytesLowWatermark: outboundBytesLowWatermark, streamDataType: .frame) - case .excludesStreamID: + case .excludesStreamID, .returnsAny: self.baseChannel = .init(allocator: allocator, parent: parent, multiplexer: multiplexer, @@ -63,6 +63,7 @@ extension MultiplexerAbstractChannel { enum InboundStreamStateInitializer { case includesStreamID(((Channel, HTTP2StreamID) -> EventLoopFuture)?) case excludesStreamID(((Channel) -> EventLoopFuture)?) + case returnsAny(((Channel) -> EventLoopFuture)) } } @@ -95,18 +96,32 @@ extension MultiplexerAbstractChannel { self.baseChannel.configure(initializer: initializer, userPromise: nil) case .excludesStreamID(let initializer): self.baseChannel.configure(initializer: initializer, userPromise: nil) + case .returnsAny(let initializer): + self.baseChannel.configure(initializer: initializer, userPromise: nil) + } + } + + func configureInboundStream(initializer: InboundStreamStateInitializer, promise: EventLoopPromise?) { + switch initializer { + case .includesStreamID, .excludesStreamID: + preconditionFailure("Configuration with a supplied `Any` promise is not supported with this initializer type.") + case .returnsAny(let initializer): + self.baseChannel.configure(initializer: initializer, userPromise: promise) } } + // legacy `initializer` function signature func configure(initializer: ((Channel, HTTP2StreamID) -> EventLoopFuture)?, userPromise promise: EventLoopPromise?) { self.baseChannel.configure(initializer: initializer, userPromise: promise) } - func configure(initializer: ((Channel) -> EventLoopFuture)?, userPromise promise: EventLoopPromise?) { + // NIOHTTP2Handler.Multiplexer and HTTP2StreamMultiplexer + func configure(initializer: NIOChannelInitializer?, userPromise promise: EventLoopPromise?) { self.baseChannel.configure(initializer: initializer, userPromise: promise) } - func configure(initializer: @escaping NIOChannelInitializerWithOutput, userPromise promise: EventLoopPromise?) { + // used for async multiplexer + func configure(initializer: @escaping NIOChannelInitializerWithOutput, userPromise promise: EventLoopPromise?) { self.baseChannel.configure(initializer: initializer, userPromise: promise) } diff --git a/Sources/NIOHTTP2PerformanceTester/Bench1Conn10kRequests.swift b/Sources/NIOHTTP2PerformanceTester/Bench1Conn10kRequests.swift index d26e9f5b..9aef49ec 100644 --- a/Sources/NIOHTTP2PerformanceTester/Bench1Conn10kRequests.swift +++ b/Sources/NIOHTTP2PerformanceTester/Bench1Conn10kRequests.swift @@ -76,7 +76,7 @@ func setupServer(group: EventLoopGroup) throws -> Channel { func sendOneRequest(channel: Channel, multiplexer: HTTP2StreamMultiplexer) throws -> Int { let responseReceivedPromise = channel.eventLoop.makePromise(of: Int.self) - func requestStreamInitializer(channel: Channel) -> EventLoopFuture { + let requestStreamInitializer: NIOChannelInitializer = { channel in return channel.pipeline.addHandlers([HTTP2FramePayloadToHTTP1ClientCodec(httpProtocol: .https), SendRequestHandler(host: "127.0.0.1", request: .init(version: .init(major: 2, minor: 0), diff --git a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift index eede6162..a3e4f39d 100644 --- a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift +++ b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift @@ -203,13 +203,16 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase { // client for _ in 0 ..< requestCount { - let streamChannel = try await clientMultiplexer.createStreamChannel( - configuration: .init( - inboundType: HTTP2Frame.FramePayload.self, - outboundType: HTTP2Frame.FramePayload.self - ) - ) { channel -> EventLoopFuture in - channel.eventLoop.makeSucceededVoidFuture() + let streamChannel = try await clientMultiplexer.createStreamChannel() { channel in + channel.eventLoop.makeCompletedFuture { + try NIOAsyncChannel( + synchronouslyWrapping: channel, + configuration: .init( + inboundType: HTTP2Frame.FramePayload.self, + outboundType: HTTP2Frame.FramePayload.self + ) + ) + } } // Let's try sending some requests try await streamChannel.outboundWriter.write(ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload) diff --git a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift index e9231b49..a041568f 100644 --- a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift +++ b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift @@ -13,6 +13,8 @@ //===----------------------------------------------------------------------===// import XCTest +import Atomics +import NIOConcurrencyHelpers import NIOCore import NIOEmbedded import NIOHTTP1 @@ -220,7 +222,7 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { XCTAssertThrowsError(try self.channel.writeInbound(dataFrame)) { error in XCTAssertEqual(streamID, (error as? NIOHTTP2Errors.NoSuchStream).map { $0.streamID }) } - + self.channel.assertNoFramesReceived() XCTAssertNoThrow(try self.channel.finish()) @@ -980,12 +982,12 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { func testCreatingOutboundChannel() throws { let configurePromise: EventLoopPromise = self.channel.eventLoop.makePromise() - var createdChannelCount = 0 - var configuredChannelCount = 0 - var streamIDs = Array() - let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { _ in + let createdChannelCount = ManagedAtomic(0) + let configuredChannelCount = ManagedAtomic(0) + let streamIDs = NIOLockedValueBox(Array()) + let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { channel in XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) + return channel.eventLoop.makeFailedFuture(MyError()) } XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait()) XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil)) @@ -993,15 +995,17 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { for _ in 0..<3 { let channelPromise: EventLoopPromise = self.channel.eventLoop.makePromise() multiplexer.createStreamChannel(promise: channelPromise) { channel in - createdChannelCount += 1 + _ = createdChannelCount.loadThenWrappingIncrement(ordering: .sequentiallyConsistent) return configurePromise.futureResult } channelPromise.futureResult.whenSuccess { channel in - configuredChannelCount += 1 + _ = configuredChannelCount.loadThenWrappingIncrement(ordering: .sequentiallyConsistent) // Write some headers: the flush will trigger a stream ID to be assigned to the channel. channel.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: [:]))).whenSuccess { channel.getOption(HTTP2StreamChannelOptions.streamID).whenSuccess { streamID in - streamIDs.append(streamID) + streamIDs.withLockedValue { streamIDs in + streamIDs.append(streamID) + } } } } @@ -1010,26 +1014,30 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { // Run the loop to create the channels. self.channel.embeddedEventLoop.run() - XCTAssertEqual(createdChannelCount, 3) - XCTAssertEqual(configuredChannelCount, 0) - XCTAssertEqual(streamIDs.count, 0) + XCTAssertEqual(createdChannelCount.load(ordering: .sequentiallyConsistent), 3) + XCTAssertEqual(configuredChannelCount.load(ordering: .sequentiallyConsistent), 0) + streamIDs.withLockedValue { streamIDs in + XCTAssertEqual(streamIDs.count, 0) + } configurePromise.succeed(()) - XCTAssertEqual(createdChannelCount, 3) - XCTAssertEqual(configuredChannelCount, 3) - XCTAssertEqual(streamIDs, [2, 4, 6].map { HTTP2StreamID($0) }) + XCTAssertEqual(createdChannelCount.load(ordering: .sequentiallyConsistent), 3) + XCTAssertEqual(configuredChannelCount.load(ordering: .sequentiallyConsistent), 3) + streamIDs.withLockedValue { streamIDs in + XCTAssertEqual(streamIDs, [2, 4, 6].map { HTTP2StreamID($0) }) + } XCTAssertNoThrow(try self.channel.finish()) } func testCreatingOutboundChannelClient() throws { let configurePromise: EventLoopPromise = self.channel.eventLoop.makePromise() - var createdChannelCount = 0 - var configuredChannelCount = 0 - var streamIDs = Array() - let multiplexer = HTTP2StreamMultiplexer(mode: .client, channel: self.channel) { _ in + let createdChannelCount = ManagedAtomic(0) + let configuredChannelCount = ManagedAtomic(0) + let streamIDs = NIOLockedValueBox(Array()) + let multiplexer = HTTP2StreamMultiplexer(mode: .client, channel: self.channel) { channel in XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) + return channel.eventLoop.makeFailedFuture(MyError()) } XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait()) XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil)) @@ -1037,15 +1045,17 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { for _ in 0..<3 { let channelPromise: EventLoopPromise = self.channel.eventLoop.makePromise() multiplexer.createStreamChannel(promise: channelPromise) { channel in - createdChannelCount += 1 + _ = createdChannelCount.loadThenWrappingIncrement(ordering: .sequentiallyConsistent) return configurePromise.futureResult } channelPromise.futureResult.whenSuccess { channel in - configuredChannelCount += 1 + _ = configuredChannelCount.loadThenWrappingIncrement(ordering: .sequentiallyConsistent) // Write some headers: the flush will trigger a stream ID to be assigned to the channel. channel.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: [:]))).whenSuccess { channel.getOption(HTTP2StreamChannelOptions.streamID).whenSuccess { streamID in - streamIDs.append(streamID) + streamIDs.withLockedValue { streamIDs in + streamIDs.append(streamID) + } } } } @@ -1054,14 +1064,18 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { // Run the loop to create the channels. self.channel.embeddedEventLoop.run() - XCTAssertEqual(createdChannelCount, 3) - XCTAssertEqual(configuredChannelCount, 0) - XCTAssertEqual(streamIDs.count, 0) + XCTAssertEqual(createdChannelCount.load(ordering: .sequentiallyConsistent), 3) + XCTAssertEqual(configuredChannelCount.load(ordering: .sequentiallyConsistent), 0) + streamIDs.withLockedValue { streamIDs in + XCTAssertEqual(streamIDs.count, 0) + } configurePromise.succeed(()) - XCTAssertEqual(createdChannelCount, 3) - XCTAssertEqual(configuredChannelCount, 3) - XCTAssertEqual(streamIDs, [1, 3, 5].map { HTTP2StreamID($0) }) + XCTAssertEqual(createdChannelCount.load(ordering: .sequentiallyConsistent), 3) + XCTAssertEqual(configuredChannelCount.load(ordering: .sequentiallyConsistent), 3) + streamIDs.withLockedValue { streamIDs in + XCTAssertEqual(streamIDs, [1, 3, 5].map { HTTP2StreamID($0) }) + } XCTAssertNoThrow(try self.channel.finish()) } @@ -1069,24 +1083,26 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { func testWritesOnCreatedChannelAreDelayed() throws { let configurePromise: EventLoopPromise = self.channel.eventLoop.makePromise() let writeRecorder = FrameWriteRecorder() - var childChannel: Channel? = nil + let childChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self) XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil)) - let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { _ in + let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { channel in XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) + return channel.eventLoop.makeFailedFuture(MyError()) } XCTAssertNoThrow(try self.channel.pipeline.addHandler(writeRecorder).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait()) multiplexer.createStreamChannel(promise: nil) { channel in - childChannel = channel + childChannelPromise.succeed(channel) return configurePromise.futureResult } (self.channel.eventLoop as! EmbeddedEventLoop).run() - XCTAssertNotNil(childChannel) - childChannel!.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: HPACKHeaders())), promise: nil) + + let childChannel = try childChannelPromise.futureResult.wait() + childChannel.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: HPACKHeaders())), promise: nil) + XCTAssertEqual(writeRecorder.flushedWrites.count, 0) configurePromise.succeed(()) @@ -1098,26 +1114,37 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { func testWritesAreCancelledOnFailingInitializer() throws { let configurePromise: EventLoopPromise = self.channel.eventLoop.makePromise() - var childChannel: Channel? = nil + let childChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self) - let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { _ in + let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { channel in XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) + return channel.eventLoop.makeFailedFuture(MyError()) } XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait()) multiplexer.createStreamChannel(promise: nil) { channel in - childChannel = channel + childChannelPromise.succeed(channel) return configurePromise.futureResult } (self.channel.eventLoop as! EmbeddedEventLoop).run() - var writeError: Error? = nil - childChannel!.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: HPACKHeaders()))).whenFailure { writeError = $0 } - XCTAssertNil(writeError) + let childChannel = try childChannelPromise.futureResult.wait() + + let writeError = NIOLockedValueBox(nil) + childChannel.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: HPACKHeaders()))).whenFailure { error in + writeError.withLockedValue{ writeError in + writeError = error + } + } + writeError.withLockedValue{ writeError in + XCTAssertNil(writeError) + } configurePromise.fail(MyError()) - XCTAssertNotNil(writeError) - XCTAssertTrue(writeError is MyError) + + writeError.withLockedValue{ writeError in + XCTAssertNotNil(writeError) + XCTAssertTrue(writeError is MyError) + } XCTAssertNoThrow(try self.channel.finish()) } @@ -1252,26 +1279,28 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { XCTAssertNoThrow(try self.channel.pipeline.addHandler(ActiveHandler(activatedPromise: activePromise)).wait()) XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/nothing")).wait()) XCTAssertTrue(didActivate) - + XCTAssertNoThrow(try self.channel.finish()) } func testCreatedChildChannelCanBeClosedImmediately() throws { - var closed = false + let closed = ManagedAtomic(false) let multiplexer = HTTP2StreamMultiplexer(mode: .client, channel: self.channel) { channel in XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) + return channel.eventLoop.makeFailedFuture(MyError()) } XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait()) - XCTAssertFalse(closed) + XCTAssertFalse(closed.load(ordering: .sequentiallyConsistent)) multiplexer.createStreamChannel(promise: nil) { channel in - channel.close().whenComplete { _ in closed = true } + channel.close().whenComplete { _ in + closed.store(true, ordering: .sequentiallyConsistent) + } return channel.eventLoop.makeSucceededFuture(()) } self.channel.embeddedEventLoop.run() - XCTAssertTrue(closed) + XCTAssertTrue(closed.load(ordering: .sequentiallyConsistent)) XCTAssertNoThrow(XCTAssertTrue(try self.channel.finish().isClean)) } @@ -1301,24 +1330,26 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { } func testCreatedChildChannelCanBeClosedImmediatelyWhenBaseIsActive() throws { - var closed = false + let closed = ManagedAtomic(false) // We need to activate the underlying channel here. XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait()) let multiplexer = HTTP2StreamMultiplexer(mode: .client, channel: self.channel) { channel in XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) + return channel.eventLoop.makeFailedFuture(MyError()) } XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait()) - XCTAssertFalse(closed) + XCTAssertFalse(closed.load(ordering: .sequentiallyConsistent)) multiplexer.createStreamChannel(promise: nil) { channel in - channel.close().whenComplete { _ in closed = true } + channel.close().whenComplete { _ in + closed.store(true, ordering: .sequentiallyConsistent) + } return channel.eventLoop.makeSucceededFuture(()) } self.channel.embeddedEventLoop.run() - XCTAssertTrue(closed) + XCTAssertTrue(closed.load(ordering: .sequentiallyConsistent)) XCTAssertNoThrow(XCTAssertTrue(try self.channel.finish().isClean)) } @@ -1955,25 +1986,31 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait()) // Now create two child channels with error recording handlers in them. Save one, ignore the other. - let errorRecorder = ErrorRecorder() - var childChannel: Channel! + let childChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self) multiplexer.createStreamChannel(promise: nil) { channel in - childChannel = channel - return channel.pipeline.addHandler(errorRecorder) + childChannelPromise.succeed(channel) + return channel.pipeline.addHandler(ErrorRecorder()) } - let secondErrorRecorder = ErrorRecorder() + let secondChildChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self) multiplexer.createStreamChannel(promise: nil) { channel in + secondChildChannelPromise.succeed(channel) // For this one we'll do a write immediately, to bring it into existence and give it a stream ID. channel.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: goodHeaders)), promise: nil) - return channel.pipeline.addHandler(secondErrorRecorder) + return channel.pipeline.addHandler(ErrorRecorder()) } self.channel.embeddedEventLoop.run() - // On this child channel, write and flush an invalid headers frame. - XCTAssertEqual(errorRecorder.errors.count, 0) - XCTAssertEqual(secondErrorRecorder.errors.count, 0) + let childChannel = try childChannelPromise.futureResult.wait() + try childChannel.pipeline.handler(type: ErrorRecorder.self).map { errorRecorder in + XCTAssertEqual(errorRecorder.errors.count, 0) + }.wait() + let secondChildChannel = try secondChildChannelPromise.futureResult.wait() + try secondChildChannel.pipeline.handler(type: ErrorRecorder.self).map { errorRecorder in + XCTAssertEqual(errorRecorder.errors.count, 0) + }.wait() + // On this child channel, write and flush an invalid headers frame. childChannel.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: badHeaders)), promise: nil) // Now, synthetically deliver the stream error that should have been produced. @@ -1985,14 +2022,19 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { ) // It should come through to the channel. - XCTAssertEqual( - errorRecorder.errors.first.flatMap { $0 as? NIOHTTP2Errors.ForbiddenHeaderField }, - NIOHTTP2Errors.forbiddenHeaderField(name: "transfer-encoding", value: "chunked") - ) - XCTAssertEqual(secondErrorRecorder.errors.count, 0) + try childChannel.pipeline.handler(type: ErrorRecorder.self).map { errorRecorder in + XCTAssertEqual( + errorRecorder.errors.first.flatMap { $0 as? NIOHTTP2Errors.ForbiddenHeaderField }, + NIOHTTP2Errors.forbiddenHeaderField(name: "transfer-encoding", value: "chunked") + ) + }.wait() + try secondChildChannel.pipeline.handler(type: ErrorRecorder.self).map { errorRecorder in + XCTAssertEqual(errorRecorder.errors.count, 0) + }.wait() // Simulate closing the child channel in response to the error. childChannel.close(promise: nil) + self.channel.embeddedEventLoop.run() // Only the HEADERS frames should have been written: we closed before the other channel became active, so @@ -2003,7 +2045,7 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { frames[0].assertHeadersFrame(endStream: false, streamID: 1, headers: goodHeaders, priority: nil, type: .request) frames[1].assertHeadersFrame(endStream: false, streamID: 3, headers: badHeaders, priority: nil, type: .doNotValidate) } - + func testPendingReadsAreFlushedEvenWithoutUnsatisfiedReadOnChannelInactive() throws { let goodHeaders = HPACKHeaders([ (":path", "/"), (":method", "GET"), (":scheme", "https"), (":authority", "localhost") @@ -2019,62 +2061,73 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait()) // Now create two child channels with error recording handlers in them. Save one, ignore the other. - let consumer = ReadAndFrameConsumer() - var childChannel: Channel! + let childChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self) multiplexer.createStreamChannel(promise: nil) { channel in - childChannel = channel - return channel.pipeline.addHandler(consumer) + childChannelPromise.succeed(channel) + return channel.pipeline.addHandler(ReadAndFrameConsumer()) } self.channel.embeddedEventLoop.run() - + + let childChannel = try childChannelPromise.futureResult.wait() + let streamID = HTTP2StreamID(1) let payload = HTTP2Frame.FramePayload.Headers(headers: goodHeaders, endStream: true) XCTAssertNoThrow(try childChannel.writeAndFlush(HTTP2Frame.FramePayload.headers(payload)).wait()) - + let frames = try self.channel.sentFrames() XCTAssertEqual(frames.count, 1) frames.first?.assertHeadersFrameMatches(this: HTTP2Frame(streamID: streamID, payload: .headers(payload))) - - XCTAssertEqual(consumer.readCount, 1) - - // 1. pass header onwards - - let responseHeaderPayload = HTTP2Frame.FramePayload.headers(.init(headers: [":status": "200"])) - XCTAssertNoThrow(try self.channel.writeInbound(HTTP2Frame(streamID: streamID, payload: responseHeaderPayload))) - XCTAssertEqual(consumer.receivedFrames.count, 1) - XCTAssertEqual(consumer.readCompleteCount, 1) - XCTAssertEqual(consumer.readCount, 2) - - consumer.forwardRead = false - + + try childChannel.pipeline.handler(type: ReadAndFrameConsumer.self).flatMapThrowing { consumer in + XCTAssertEqual(consumer.readCount, 1) + + // 1. pass header onwards + let responseHeaderPayload = HTTP2Frame.FramePayload.headers(.init(headers: [":status": "200"])) + XCTAssertNoThrow(try self.channel.writeInbound(HTTP2Frame(streamID: streamID, payload: responseHeaderPayload))) + + XCTAssertEqual(consumer.receivedFrames.count, 1) + XCTAssertEqual(consumer.readCompleteCount, 1) + XCTAssertEqual(consumer.readCount, 2) + + consumer.forwardRead = false + }.wait() + // 2. pass body onwards - let responseBody1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(.init(string: "foo")))) XCTAssertNoThrow(try self.channel.writeInbound(HTTP2Frame(streamID: streamID, payload: responseBody1))) - XCTAssertEqual(consumer.receivedFrames.count, 2) - XCTAssertEqual(consumer.readCompleteCount, 2) - XCTAssertEqual(consumer.readCount, 3) - XCTAssertEqual(consumer.readPending, true) - + + try childChannel.pipeline.handler(type: ReadAndFrameConsumer.self).flatMapThrowing { consumer in + XCTAssertEqual(consumer.receivedFrames.count, 2) + XCTAssertEqual(consumer.readCompleteCount, 2) + XCTAssertEqual(consumer.readCount, 3) + XCTAssertEqual(consumer.readPending, true) + }.wait() + // 3. pass on more body - should not change a thing, since read is pending in consumer - + let responseBody2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(.init(string: "bar")), endStream: true)) XCTAssertNoThrow(try self.channel.writeInbound(HTTP2Frame(streamID: streamID, payload: responseBody2))) - XCTAssertEqual(consumer.receivedFrames.count, 2) - XCTAssertEqual(consumer.readCompleteCount, 2) - XCTAssertEqual(consumer.readCount, 3) - XCTAssertEqual(consumer.readPending, true) + + try childChannel.pipeline.handler(type: ReadAndFrameConsumer.self).flatMapThrowing { consumer in + XCTAssertEqual(consumer.receivedFrames.count, 2) + XCTAssertEqual(consumer.readCompleteCount, 2) + XCTAssertEqual(consumer.readCount, 3) + XCTAssertEqual(consumer.readPending, true) + + XCTAssertEqual(consumer.channelInactiveCount, 0) + }.wait() // 4. signal stream is closed – this should force forward all pending frames - - XCTAssertEqual(consumer.channelInactiveCount, 0) self.channel.pipeline.fireUserInboundEventTriggered(StreamClosedEvent(streamID: streamID, reason: nil)) - XCTAssertEqual(consumer.receivedFrames.count, 3) - XCTAssertEqual(consumer.readCompleteCount, 3) - XCTAssertEqual(consumer.readCount, 3) - XCTAssertEqual(consumer.channelInactiveCount, 1) - XCTAssertEqual(consumer.readPending, true) + + try childChannel.pipeline.handler(type: ReadAndFrameConsumer.self).flatMapThrowing { consumer in + XCTAssertEqual(consumer.receivedFrames.count, 3) + XCTAssertEqual(consumer.readCompleteCount, 3) + XCTAssertEqual(consumer.readCount, 3) + XCTAssertEqual(consumer.channelInactiveCount, 1) + XCTAssertEqual(consumer.readPending, true) + }.wait() } } @@ -2092,13 +2145,13 @@ final class ErrorRecorder: ChannelInboundHandler { private final class ReadAndFrameConsumer: ChannelInboundHandler, ChannelOutboundHandler { typealias InboundIn = HTTP2Frame.FramePayload typealias OutboundIn = HTTP2Frame.FramePayload - + private(set) var receivedFrames: [HTTP2Frame.FramePayload] = [] private(set) var readCount = 0 private(set) var readCompleteCount = 0 private(set) var channelInactiveCount = 0 private(set) var readPending = false - + var forwardRead = true { didSet { if self.forwardRead, self.readPending { @@ -2107,13 +2160,13 @@ private final class ReadAndFrameConsumer: ChannelInboundHandler, ChannelOutbound } } } - + var context: ChannelHandlerContext! - + func handlerAdded(context: ChannelHandlerContext) { self.context = context } - + func handlerRemoved(context: ChannelHandlerContext) { self.context = context } @@ -2122,12 +2175,12 @@ private final class ReadAndFrameConsumer: ChannelInboundHandler, ChannelOutbound self.receivedFrames.append(self.unwrapInboundIn(data)) context.fireChannelRead(data) } - + func channelReadComplete(context: ChannelHandlerContext) { self.readCompleteCount += 1 context.fireChannelReadComplete() } - + func channelInactive(context: ChannelHandlerContext) { self.channelInactiveCount += 1 context.fireChannelInactive() diff --git a/Tests/NIOHTTP2Tests/HTTP2InlineStreamMultiplexerTests.swift b/Tests/NIOHTTP2Tests/HTTP2InlineStreamMultiplexerTests.swift index 5544377c..f10a0232 100644 --- a/Tests/NIOHTTP2Tests/HTTP2InlineStreamMultiplexerTests.swift +++ b/Tests/NIOHTTP2Tests/HTTP2InlineStreamMultiplexerTests.swift @@ -24,9 +24,9 @@ import NIOHTTP1 private extension Channel { /// Adds a simple no-op ``HTTP2StreamMultiplexer`` to the pipeline. func addNoOpInlineMultiplexer(mode: NIOHTTP2Handler.ParserMode, eventLoop: EventLoop) { - XCTAssertNoThrow(try self.pipeline.addHandler(NIOHTTP2Handler(mode: mode, eventLoop: eventLoop) { channel in + XCTAssertNoThrow(try self.pipeline.addHandler(NIOHTTP2Handler(mode: mode, eventLoop: eventLoop, inboundStreamInitializer: { channel in self.eventLoop.makeSucceededFuture(()) - }).wait()) + })).wait()) } } @@ -136,10 +136,10 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testHeadersFramesCreateNewChannels() throws { let channelCount = ManagedAtomic(0) - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in channelCount.wrappingIncrement(ordering: .sequentiallyConsistent) return channel.close() - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -156,7 +156,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testChannelsCloseThemselvesWhenToldTo() throws { let completedChannelCount = ManagedAtomic(0) - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in channel.closeFuture.whenSuccess { completedChannelCount.wrappingIncrement(ordering: .sequentiallyConsistent) } return channel.pipeline.addHandler(TestHookHandler { context, payload in guard case .headers(let requestHeaders) = payload else { @@ -167,7 +167,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let headers = HTTP2Frame.FramePayload.headers(.init(headers: .basicResponseHeaders, endStream: true)) context.writeAndFlush(NIOAny(headers), promise: nil) }) - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -196,7 +196,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let frame = HTTP2Frame(streamID: streamID, payload: .headers(.init(headers: .basicRequestHeaders, endStream: true))) let rstStreamFrame = HTTP2Frame(streamID: streamID, payload: .rstStream(.cancel)) - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in closeError.withLockedValue { closeError in XCTAssertNil(closeError) } @@ -206,7 +206,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { } } return channel.pipeline.addHandler(FramePayloadExpecter(expectedPayload: [frame.payload, rstStreamFrame.payload])) - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -235,10 +235,10 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let frame = HTTP2Frame(streamID: streamID, payload: .headers(.init(headers: .basicRequestHeaders, endStream: true))) let goAwayFrame = HTTP2Frame(streamID: .rootStream, payload: .goAway(lastStreamID: .rootStream, errorCode: .http11Required, opaqueData: nil)) - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in XCTFail() return channel.eventLoop.makeSucceededVoidFuture() - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup(mode: .client)) @@ -269,9 +269,9 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testClosingIdleChannels() throws { let frameReceiver = IODataWriteRecorder() - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in return channel.close() - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(frameReceiver).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -301,10 +301,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testClosingActiveChannels() throws { let frameReceiver = IODataWriteRecorder() let channelPromise: EventLoopPromise = self.channel.eventLoop.makePromise() - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in - channelPromise.succeed(channel) - return channel.eventLoop.makeSucceededFuture(()) - } + let http2Handler = NIOHTTP2Handler( + mode: .server, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + channelPromise.succeed(channel) + return channel.eventLoop.makeSucceededVoidFuture() + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(frameReceiver).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -337,10 +341,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testClosePromiseIsSatisfied() throws { let frameReceiver = IODataWriteRecorder() let channelPromise: EventLoopPromise = self.channel.eventLoop.makePromise() - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in - channelPromise.succeed(channel) - return channel.eventLoop.makeSucceededFuture(()) - } + let http2Handler = NIOHTTP2Handler( + mode: .server, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + channelPromise.succeed(channel) + return channel.eventLoop.makeSucceededVoidFuture() + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(frameReceiver).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -373,10 +381,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testMultipleClosePromisesAreSatisfied() throws { let frameReceiver = IODataWriteRecorder() let channelPromise: EventLoopPromise = self.channel.eventLoop.makePromise() - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in - channelPromise.succeed(channel) - return channel.eventLoop.makeSucceededFuture(()) - } + let http2Handler = NIOHTTP2Handler( + mode: .server, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + channelPromise.succeed(channel) + return channel.eventLoop.makeSucceededVoidFuture() + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(frameReceiver).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -428,10 +440,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testClosePromiseFailsWithError() throws { let frameReceiver = IODataWriteRecorder() let channelPromise: EventLoopPromise = self.channel.eventLoop.makePromise() - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in - channelPromise.succeed(channel) - return channel.eventLoop.makeSucceededFuture(()) - } + let http2Handler = NIOHTTP2Handler( + mode: .server, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + channelPromise.succeed(channel) + return channel.eventLoop.makeSucceededVoidFuture() + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(frameReceiver).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -464,12 +480,12 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testFramesAreNotDeliveredUntilStreamIsSetUp() throws { let channelPromise: EventLoopPromise = self.channel.eventLoop.makePromise() let setupCompletePromise: EventLoopPromise = self.channel.eventLoop.makePromise() - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in channelPromise.succeed(channel) return channel.pipeline.addHandler(InboundFramePayloadRecorder()).flatMap { setupCompletePromise.futureResult } - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -515,12 +531,12 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let writeRecorder = IODataWriteRecorder() let channelPromise: EventLoopPromise = self.channel.eventLoop.makePromise() let setupCompletePromise: EventLoopPromise = self.channel.eventLoop.makePromise() - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in channelPromise.succeed(channel) return channel.pipeline.addHandler(InboundFramePayloadRecorder()).flatMap { setupCompletePromise.futureResult } - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(writeRecorder).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -579,12 +595,16 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testFlushingOneChannelDoesntFlushThemAll() throws { let writeTracker = IODataWriteRecorder() let channels = NIOLockedValueBox<[Channel]>([]) - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in - channels.withLockedValue { channels in - channels.append(channel) + let http2Handler = NIOHTTP2Handler( + mode: .server, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + channels.withLockedValue { channels in + channels.append(channel) + } + return channel.eventLoop.makeSucceededVoidFuture() } - return channel.eventLoop.makeSucceededFuture(()) - } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(writeTracker).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -629,12 +649,12 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testUnflushedWritesFailOnError() throws { let childChannel = NIOLockedValueBox(nil) - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in childChannel.withLockedValue { childChannel in childChannel = channel } return channel.eventLoop.makeSucceededVoidFuture() - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -665,7 +685,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testWritesFailOnClosedStreamChannels() throws { let childChannel = NIOLockedValueBox(nil) - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in childChannel.withLockedValue { childChannel in childChannel = channel } @@ -678,7 +698,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let headers = HTTP2Frame.FramePayload.headers(.init(headers: .basicResponseHeaders, endStream: true)) context.writeAndFlush(NIOAny(headers), promise: nil) }) - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -944,7 +964,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let handlerRemovedPromise: EventLoopPromise = self.channel.eventLoop.makePromise() handlerRemovedPromise.futureResult.whenComplete { _ in handlerRemoved = true } - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in return channel.pipeline.addHandlers([ HandlerRemovedHandler(removedPromise: handlerRemovedPromise), TestHookHandler { context, payload in @@ -956,7 +976,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let headers = HTTP2Frame.FramePayload.headers(.init(headers: .basicResponseHeaders, endStream: true)) context.writeAndFlush(NIOAny(headers), promise: nil) }]) - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -1016,10 +1036,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let createdChannelCount = ManagedAtomic(0) var configuredChannelCount = 0 var streamIDs = Array() - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, eventLoop: + self.channel.eventLoop, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil)) // to make the channel active @@ -1063,10 +1087,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil)) - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, eventLoop: + self.channel.eventLoop, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(writeRecorder).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) (self.channel.eventLoop as! EmbeddedEventLoop).run() @@ -1098,10 +1126,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let configurePromise: EventLoopPromise = self.channel.eventLoop.makePromise() let childChannel = NIOLockedValueBox(nil) - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .server, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) let multiplexer = try http2Handler.multiplexer.wait() multiplexer.createStreamChannel(promise: nil) { channel in @@ -1129,10 +1161,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let configurePromise: EventLoopPromise = self.channel.eventLoop.makePromise() let writeRecorder = FrameWriteRecorder() - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .server, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(writeRecorder).wait()) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) let multiplexer = try http2Handler.multiplexer.wait() @@ -1158,10 +1194,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { XCTFail("Activation promise must not fail") } - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .server, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) let multiplexer = try http2Handler.multiplexer.wait() multiplexer.createStreamChannel(promise: nil) { channel in @@ -1188,10 +1228,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { XCTFail("Activation promise must not fail") } - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .server, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 8765)).wait()) @@ -1220,9 +1264,9 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { XCTFail("Activation promise must not fail") } - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in return channel.pipeline.addHandler(activeRecorder) - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) self.channel.pipeline.fireChannelActive() @@ -1267,10 +1311,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testCreatedChildChannelCanBeClosedImmediately() throws { let closed = ManagedAtomic(false) - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { channel in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertFalse(closed.load(ordering: .sequentiallyConsistent)) @@ -1287,10 +1335,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testCreatedChildChannelCanBeClosedBeforeWritingHeaders() throws { var closed = false - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { channel in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) let channelPromise = self.channel.eventLoop.makePromise(of: Channel.self) @@ -1316,10 +1368,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { // We need to activate the underlying channel here. XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait()) - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { channel in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertEqual(try self.channel.readAllBuffers().count, 2) // magic & settings @@ -1341,10 +1397,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { // We need to activate the underlying channel here. XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait()) - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { channel in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertEqual(try self.channel.readAllBuffers().count, 2) // magic & settings @@ -1372,9 +1432,9 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { XCTAssertNoThrow(try self.channel.pipeline.addHandler(flushCounter).wait()) // Add a server-mode multiplexer that will add an auto-response handler. - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in channel.pipeline.addHandler(QuickFramePayloadResponseHandler()) - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -1504,10 +1564,15 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { var streamConfiguration = NIOHTTP2Handler.StreamConfiguration() streamConfiguration.outboundBufferSizeHighWatermark = 100 streamConfiguration.outboundBufferSizeLowWatermark = 50 - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop, streamConfiguration: streamConfiguration) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, + eventLoop: self.channel.eventLoop, + streamConfiguration: streamConfiguration, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) // We need to activate the underlying channel here. @@ -1557,10 +1622,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { } func testMultiplexerModifiesStreamChannelWritabilityBasedOnParentChannelWritability() throws { - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, eventLoop: + self.channel.eventLoop, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) // We need to activate the underlying channel here. @@ -1603,10 +1672,15 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { var streamConfiguration = NIOHTTP2Handler.StreamConfiguration() streamConfiguration.outboundBufferSizeHighWatermark = 100 streamConfiguration.outboundBufferSizeLowWatermark = 50 - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop, streamConfiguration: streamConfiguration) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, + eventLoop: self.channel.eventLoop, + streamConfiguration: streamConfiguration, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) // We need to activate the underlying channel here. @@ -1672,10 +1746,15 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { var streamConfiguration = NIOHTTP2Handler.StreamConfiguration() streamConfiguration.outboundBufferSizeHighWatermark = 100 streamConfiguration.outboundBufferSizeLowWatermark = 50 - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop, streamConfiguration: streamConfiguration) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, + eventLoop: self.channel.eventLoop, + streamConfiguration: streamConfiguration, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) // We need to activate the underlying channel here. @@ -1736,7 +1815,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { } func testStreamChannelSupportsSyncOptions() throws { - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in XCTAssert(channel is HTTP2StreamChannel) if let sync = channel.syncOptions { do { @@ -1754,7 +1833,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { } return channel.close() - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -1769,9 +1848,9 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { var badHeaders = goodHeaders badHeaders.add(name: "transfer-encoding", value: "chunked") - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { channel in + let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop, inboundStreamInitializer: { channel in return channel.eventLoop.makeSucceededFuture(()) - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup(mode: .client)) XCTAssertEqual(try self.channel.readAllBuffers().count, 3) // drain outbound magic, settings & ACK @@ -1832,10 +1911,14 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { (":path", "/"), (":method", "GET"), (":scheme", "https"), (":authority", "localhost") ]) - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop) { channel in - XCTFail("Server push is unexpected") - return channel.eventLoop.makeSucceededFuture(()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, + eventLoop: self.channel.eventLoop, + inboundStreamInitializer: { channel in + XCTFail("Server push is unexpected") + return channel.eventLoop.makeSucceededFuture(()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup(mode: .client)) XCTAssertEqual(try self.channel.readAllBuffers().count, 3) // drain outbound magic, settings & ACK @@ -1939,7 +2022,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { func testDelegateReceivesCreationAndCloseNotifications() throws { let streamDelegate = CountingStreamDelegate() let completedChannelCount = ManagedAtomic(0) - let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, streamDelegate: streamDelegate) { channel in + let http2Handler = NIOHTTP2Handler(mode: .server, eventLoop: self.channel.eventLoop, streamDelegate: streamDelegate, inboundStreamInitializer: { channel in channel.closeFuture.whenSuccess { completedChannelCount.wrappingIncrement(ordering: .sequentiallyConsistent) } return channel.pipeline.addHandler(TestHookHandler { context, payload in if case .headers(let requestHeaders) = payload { @@ -1949,7 +2032,7 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { context.writeAndFlush(NIOAny(headers), promise: nil) } }) - } + }) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) XCTAssertNoThrow(try connectionSetup()) @@ -1990,10 +2073,15 @@ final class HTTP2InlineStreamMultiplexerTests: XCTestCase { let createdChannelCount = ManagedAtomic(0) var configuredChannelCount = 0 var streamIDs = Array() - let http2Handler = NIOHTTP2Handler(mode: .client, eventLoop: self.channel.eventLoop, streamDelegate: streamDelegate) { _ in - XCTFail("Must not be called") - return self.channel.eventLoop.makeFailedFuture(MyError()) - } + let http2Handler = NIOHTTP2Handler( + mode: .client, + eventLoop: self.channel.eventLoop, + streamDelegate: streamDelegate, + inboundStreamInitializer: { _ in + XCTFail("Must not be called") + return self.channel.eventLoop.makeFailedFuture(MyError()) + } + ) XCTAssertNoThrow(try self.channel.pipeline.addHandler(http2Handler).wait()) try connectionSetup(mode: .client) diff --git a/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift b/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift index 70b8ea84..f195da62 100644 --- a/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift +++ b/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift @@ -638,8 +638,9 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase { // Now we're going to send a request, including a very large body: 65536 bytes in size. To avoid spending too much // time initializing buffers, we're going to send the same 1kB data frame 64 times. let headers = HPACKHeaders([(":path", "/"), (":method", "POST"), (":scheme", "https"), (":authority", "localhost")]) - var requestBody = self.clientChannel.allocator.buffer(capacity: 1024) - requestBody.writeBytes(Array(repeating: UInt8(0x04), count: 1024)) + var _requestBody = self.clientChannel.allocator.buffer(capacity: 1024) + _requestBody.writeBytes(Array(repeating: UInt8(0x04), count: 1024)) + let requestBody = _requestBody // We're going to open a stream and queue up the frames for that stream. let handler = try self.clientChannel.pipeline.context(handlerType: HTTP2StreamMultiplexer.self).wait().handler as! HTTP2StreamMultiplexer @@ -1796,7 +1797,7 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase { // Here we send a large response: 65535 bytes in size. let responseHeaders = HPACKHeaders([(":status", "200"), ("content-length", "65535")]) - var responseBody = self.clientChannel.allocator.buffer(capacity: 65535) + var responseBody = channel.allocator.buffer(capacity: 65535) responseBody.writeBytes(Array(repeating: UInt8(0x04), count: 65535)) let respFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: responseHeaders)) @@ -1814,11 +1815,10 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase { // We're going to open a stream and queue up the frames for that stream. let handler = try self.clientChannel.pipeline.handler(type: HTTP2StreamMultiplexer.self).wait() - var reqFrame: HTTP2Frame.FramePayload? = nil + let reqFrame = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true)) handler.createStreamChannel(promise: nil) { channel in // We need END_STREAM set here, because that will force the stream to be closed on the response. - reqFrame = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true)) channel.writeAndFlush(reqFrame, promise: nil) return channel.eventLoop.makeSucceededFuture(()) } @@ -1831,7 +1831,7 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase { try self.serverChannel.assertReceivedFrame().assertWindowUpdateFrame(streamID: 0, windowIncrement: 65535) // And only the request frame frame for the child stream, as there was no need to open its stream window. - childHandler.receivedFrames.assertFramePayloadsMatch([reqFrame!]) + childHandler.receivedFrames.assertFramePayloadsMatch([reqFrame]) // No other frames should be emitted, though the client may have many in a child stream. self.serverChannel.assertNoFramesReceived()