diff --git a/Package.swift b/Package.swift index 008de4f828..59a3e03a51 100644 --- a/Package.swift +++ b/Package.swift @@ -196,7 +196,8 @@ let package = Package( "NIOConcurrencyHelpers", "CNIOLLHTTP", swiftCollections, - ] + ], + swiftSettings: strictConcurrencySettings ), .target( name: "NIOWebSocket", diff --git a/Sources/NIOHTTP1/HTTPPipelineSetup.swift b/Sources/NIOHTTP1/HTTPPipelineSetup.swift index 5e935123ce..a7645377d6 100644 --- a/Sources/NIOHTTP1/HTTPPipelineSetup.swift +++ b/Sources/NIOHTTP1/HTTPPipelineSetup.swift @@ -22,6 +22,10 @@ public typealias NIOHTTPClientUpgradeConfiguration = ( upgraders: [NIOHTTPClientProtocolUpgrader], completionHandler: @Sendable (ChannelHandlerContext) -> Void ) +public typealias NIOHTTPClientUpgradeSendableConfiguration = ( + upgraders: [NIOHTTPClientProtocolUpgrader & Sendable], completionHandler: @Sendable (ChannelHandlerContext) -> Void +) + /// Configuration required to configure a HTTP server pipeline for upgrade. /// /// See the documentation for `HTTPServerUpgradeHandler` for details on these @@ -33,6 +37,10 @@ public typealias NIOHTTPServerUpgradeConfiguration = ( upgraders: [HTTPServerProtocolUpgrader], completionHandler: @Sendable (ChannelHandlerContext) -> Void ) +public typealias NIOHTTPServerUpgradeSendableConfiguration = ( + upgraders: [HTTPServerProtocolUpgrader & Sendable], completionHandler: @Sendable (ChannelHandlerContext) -> Void +) + extension ChannelPipeline { /// Configure a `ChannelPipeline` for use as a HTTP client. /// @@ -67,7 +75,7 @@ extension ChannelPipeline { public func addHTTPClientHandlers( position: Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, - withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? + withClientUpgrade upgrade: NIOHTTPClientUpgradeSendableConfiguration? ) -> EventLoopFuture { self._addHTTPClientHandlers( position: position, @@ -79,7 +87,7 @@ extension ChannelPipeline { private func _addHTTPClientHandlers( position: Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, - withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? + withClientUpgrade upgrade: NIOHTTPClientUpgradeSendableConfiguration? ) -> EventLoopFuture { let future: EventLoopFuture @@ -120,11 +128,12 @@ extension ChannelPipeline { /// the upgrade completion handler. See the documentation on ``NIOHTTPClientUpgradeHandler`` /// for more details. /// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured. + @preconcurrency public func addHTTPClientHandlers( position: Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, enableOutboundHeaderValidation: Bool = true, - withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil + withClientUpgrade upgrade: NIOHTTPClientUpgradeSendableConfiguration? = nil ) -> EventLoopFuture { let future: EventLoopFuture @@ -168,12 +177,13 @@ extension ChannelPipeline { /// the upgrade completion handler. See the documentation on ``NIOHTTPClientUpgradeHandler`` /// for more details. /// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured. + @preconcurrency public func addHTTPClientHandlers( position: Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, enableOutboundHeaderValidation: Bool = true, encoderConfiguration: HTTPRequestEncoder.Configuration = .init(), - withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil + withClientUpgrade upgrade: NIOHTTPClientUpgradeSendableConfiguration? = nil ) -> EventLoopFuture { let future: EventLoopFuture @@ -234,7 +244,7 @@ extension ChannelPipeline { public func configureHTTPServerPipeline( position: ChannelPipeline.Position = .last, withPipeliningAssistance pipelining: Bool = true, - withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withServerUpgrade upgrade: NIOHTTPServerUpgradeSendableConfiguration? = nil, withErrorHandling errorHandling: Bool = true ) -> EventLoopFuture { self._configureHTTPServerPipeline( @@ -274,10 +284,11 @@ extension ChannelPipeline { /// - headerValidation: Whether to validate outbound request headers to confirm that they meet /// spec compliance. Defaults to `true`. /// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured. + @preconcurrency public func configureHTTPServerPipeline( position: ChannelPipeline.Position = .last, withPipeliningAssistance pipelining: Bool = true, - withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withServerUpgrade upgrade: NIOHTTPServerUpgradeSendableConfiguration? = nil, withErrorHandling errorHandling: Bool = true, withOutboundHeaderValidation headerValidation: Bool = true ) -> EventLoopFuture { @@ -320,10 +331,11 @@ extension ChannelPipeline { /// spec compliance. Defaults to `true`. /// - encoderConfiguration: The configuration for the ``HTTPResponseEncoder``. /// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured. + @preconcurrency public func configureHTTPServerPipeline( position: ChannelPipeline.Position = .last, withPipeliningAssistance pipelining: Bool = true, - withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withServerUpgrade upgrade: NIOHTTPServerUpgradeSendableConfiguration? = nil, withErrorHandling errorHandling: Bool = true, withOutboundHeaderValidation headerValidation: Bool = true, withEncoderConfiguration encoderConfiguration: HTTPResponseEncoder.Configuration = .init() @@ -341,7 +353,7 @@ extension ChannelPipeline { private func _configureHTTPServerPipeline( position: ChannelPipeline.Position = .last, withPipeliningAssistance pipelining: Bool = true, - withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withServerUpgrade upgrade: NIOHTTPServerUpgradeSendableConfiguration? = nil, withErrorHandling errorHandling: Bool = true, withOutboundHeaderValidation headerValidation: Bool = true, withEncoderConfiguration encoderConfiguration: HTTPResponseEncoder.Configuration = .init() diff --git a/Sources/NIOHTTP1/HTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/HTTPServerUpgradeHandler.swift index 464baeec36..75fea7d049 100644 --- a/Sources/NIOHTTP1/HTTPServerUpgradeHandler.swift +++ b/Sources/NIOHTTP1/HTTPServerUpgradeHandler.swift @@ -180,28 +180,26 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha // We'll attempt to upgrade. This may take a while, so while we're waiting more data can come in. self.upgradeState = .awaitingUpgrader - let eventLoop = context.eventLoop - let loopBoundContext = context.loopBound self.handleUpgrade(context: context, request: request, requestedProtocols: requestedProtocols) - .hop(to: eventLoop) // the user might return a future from another EventLoop. .whenSuccess { callback in - eventLoop.assertInEventLoop() if let callback = callback { self.gotUpgrader(upgrader: callback) } else { - self.notUpgrading(context: loopBoundContext.value, data: requestPart) + self.notUpgrading(context: context, data: requestPart) } } } /// The core of the upgrade handling logic. /// - /// - Returns: An `EventLoopFuture` that will contain a callback to invoke if upgrade is requested, or nil if upgrade has failed. Never returns a failed future. + /// - Returns: An isolated `EventLoopFuture` that will contain a callback to invoke if upgrade is requested, + /// or nil if upgrade has failed. Never returns a failed future. private func handleUpgrade( context: ChannelHandlerContext, request: HTTPRequestHead, requestedProtocols: [String] - ) -> EventLoopFuture<(() -> Void)?> { + ) -> EventLoopFuture<(() -> Void)?>.Isolated { + let connectionHeader = Set(request.headers[canonicalForm: "connection"].map { $0.lowercased() }) let allHeaderNames = Set(request.headers.map { $0.name.lowercased() }) @@ -219,18 +217,21 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha /// Attempt to upgrade a single protocol. /// /// Will recurse through `protocolIterator` if upgrade fails. + /// + /// - Returns: An isolated `EventLoopFuture` that will contain a callback to invoke if upgrade is requested, + /// or nil if upgrade has failed. Never returns a failed future. private func handleUpgradeForProtocol( context: ChannelHandlerContext, protocolIterator: Array.Iterator, request: HTTPRequestHead, allHeaderNames: Set, connectionHeader: Set - ) -> EventLoopFuture<(() -> Void)?> { + ) -> EventLoopFuture<(() -> Void)?>.Isolated { // We want a local copy of the protocol iterator. We'll pass it to the next invocation of the function. var protocolIterator = protocolIterator guard let proto = protocolIterator.next() else { // We're done! No suitable protocol for upgrade. - return context.eventLoop.makeSucceededFuture(nil) + return context.eventLoop.makeSucceededIsolatedFuture(nil) } guard let upgrader = self.upgraders[proto.lowercased()] else { @@ -256,66 +257,67 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha let responseHeaders = self.buildUpgradeHeaders(protocol: proto) let pipeline = context.pipeline - let loopBoundContext = context.loopBound + return upgrader.buildUpgradeResponse( channel: context.channel, upgradeRequest: request, initialResponseHeaders: responseHeaders - ).map { finalResponseHeaders in - { - // Ok, we're upgrading. - self.upgradeState = .upgrading - - // Before we finish the upgrade we have to remove the HTTPDecoder and any other non-Encoder HTTP - // handlers from the pipeline, to prevent them parsing any more data. We'll buffer the data until - // that completes. - // While there are a lot of Futures involved here it's quite possible that all of this code will - // actually complete synchronously: we just want to program for the possibility that it won't. - // Once that's done, we send the upgrade response, then remove the HTTP encoder, then call the - // internal handler, then call the user code, and then finally when the user code is done we do - // our final cleanup steps, namely we replay the received data we buffered in the meantime and - // then remove ourselves from the pipeline. - self.removeExtraHandlers(pipeline: pipeline).flatMap { - self.sendUpgradeResponse( - context: loopBoundContext.value, - upgradeRequest: request, - responseHeaders: finalResponseHeaders - ) - }.flatMap { - pipeline.syncOperations.removeHandler(self.httpEncoder) - }.flatMap { () -> EventLoopFuture in - let context = loopBoundContext.value - self.upgradeCompletionHandler(context) - return upgrader.upgrade(context: context, upgradeRequest: request) - }.whenComplete { result in - let context = loopBoundContext.value - switch result { - case .success: - context.fireUserInboundEventTriggered( - HTTPServerUpgradeEvents.upgradeComplete(toProtocol: proto, upgradeRequest: request) - ) - self.upgradeState = .upgradeComplete - // When we remove ourselves we'll be delivering any buffered data. - context.pipeline.syncOperations.removeHandler(context: context, promise: nil) - - case .failure(let error): - // Remain in the '.upgrading' state. - context.fireErrorCaught(error) - } + ).hop(to: context.eventLoop) + .assumeIsolated() + .map { finalResponseHeaders in + { + // Ok, we're upgrading. + self.upgradeState = .upgrading + + // Before we finish the upgrade we have to remove the HTTPDecoder and any other non-Encoder HTTP + // handlers from the pipeline, to prevent them parsing any more data. We'll buffer the data until + // that completes. + // While there are a lot of Futures involved here it's quite possible that all of this code will + // actually complete synchronously: we just want to program for the possibility that it won't. + // Once that's done, we send the upgrade response, then remove the HTTP encoder, then call the + // internal handler, then call the user code, and then finally when the user code is done we do + // our final cleanup steps, namely we replay the received data we buffered in the meantime and + // then remove ourselves from the pipeline. + self.removeExtraHandlers(pipeline: pipeline) + .assumeIsolated() + .flatMap { + self.sendUpgradeResponse( + context: context, + upgradeRequest: request, + responseHeaders: finalResponseHeaders + ) + }.flatMap { + pipeline.syncOperations.removeHandler(self.httpEncoder) + }.flatMap { () -> EventLoopFuture in + self.upgradeCompletionHandler(context) + return upgrader.upgrade(context: context, upgradeRequest: request) + }.whenComplete { result in + switch result { + case .success: + context.fireUserInboundEventTriggered( + HTTPServerUpgradeEvents.upgradeComplete(toProtocol: proto, upgradeRequest: request) + ) + self.upgradeState = .upgradeComplete + // When we remove ourselves we'll be delivering any buffered data. + context.pipeline.syncOperations.removeHandler(context: context, promise: nil) + + case .failure(let error): + // Remain in the '.upgrading' state. + context.fireErrorCaught(error) + } + } } + }.flatMapError { error in + // No upgrade here. We want to fire the error down the pipeline, and then try another loop iteration. + context.fireErrorCaught(error) + return self.handleUpgradeForProtocol( + context: context, + protocolIterator: protocolIterator, + request: request, + allHeaderNames: allHeaderNames, + connectionHeader: connectionHeader + ) } - }.flatMapError { error in - // No upgrade here. We want to fire the error down the pipeline, and then try another loop iteration. - let context = loopBoundContext.value - context.fireErrorCaught(error) - return self.handleUpgradeForProtocol( - context: context, - protocolIterator: protocolIterator, - request: request, - allHeaderNames: allHeaderNames, - connectionHeader: connectionHeader - ) - } } private func gotUpgrader(upgrader: @escaping (() -> Void)) { @@ -379,7 +381,7 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha } return .andAllSucceed( - self.extraHTTPHandlers.map { pipeline.removeHandler($0) }, + self.extraHTTPHandlers.map { pipeline.syncOperations.removeHandler($0) }, on: pipeline.eventLoop ) } diff --git a/Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift b/Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift index 0285c07ea7..45de6bfe9b 100644 --- a/Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift +++ b/Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift @@ -18,7 +18,7 @@ import NIOCore /// Configuration for an upgradable HTTP pipeline. @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public struct NIOUpgradableHTTPServerPipelineConfiguration { +public struct NIOUpgradableHTTPServerPipelineConfiguration: Sendable { /// Whether to provide assistance handling HTTP clients that pipeline /// their requests. Defaults to `true`. If `false`, users will need to handle clients that pipeline themselves. public var enablePipelining = true @@ -146,7 +146,7 @@ extension ChannelPipeline.SynchronousOperations { /// Configuration for an upgradable HTTP pipeline. @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public struct NIOUpgradableHTTPClientPipelineConfiguration { +public struct NIOUpgradableHTTPClientPipelineConfiguration: Sendable { /// The strategy to use when dealing with leftover bytes after removing the ``HTTPDecoder`` from the pipeline. public var leftOverBytesStrategy = RemoveAfterUpgradeStrategy.dropBytes diff --git a/Sources/NIOHTTP1/NIOHTTPClientUpgradeHandler.swift b/Sources/NIOHTTP1/NIOHTTPClientUpgradeHandler.swift index 66a7b1a17a..0d886e2ddd 100644 --- a/Sources/NIOHTTP1/NIOHTTPClientUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOHTTPClientUpgradeHandler.swift @@ -326,21 +326,19 @@ public final class NIOHTTPClientUpgradeHandler: ChannelDuplexHandler, RemovableC // upgrader code is done, we do our final cleanup steps, namely we replay the received data we // buffered in the meantime and then remove ourselves from the pipeline. let pipeline = context.pipeline - let loopBoundContext = context.loopBound return { self.upgradeState = .upgrading self.removeHTTPHandlers(pipeline: pipeline) .map { // Let the other handlers be removed before continuing with upgrade. - self.upgradeCompletionHandler(loopBoundContext.value) + self.upgradeCompletionHandler(context) self.upgradeState = .upgradingAddingHandlers } .flatMap { - upgrader.upgrade(context: loopBoundContext.value, upgradeResponse: response) + upgrader.upgrade(context: context, upgradeResponse: response) } .map { - let context = loopBoundContext.value // We unbuffer any buffered data here. // If we received any, we fire readComplete. @@ -359,20 +357,19 @@ public final class NIOHTTPClientUpgradeHandler: ChannelDuplexHandler, RemovableC self.upgradeState = .upgradeComplete } .whenComplete { _ in - let context = loopBoundContext.value context.pipeline.syncOperations.removeHandler(context: context, promise: nil) } } } /// Removes any extra HTTP-related handlers from the channel pipeline. - private func removeHTTPHandlers(pipeline: ChannelPipeline) -> EventLoopFuture { + private func removeHTTPHandlers(pipeline: ChannelPipeline) -> EventLoopFuture.Isolated { guard self.httpHandlers.count > 0 else { - return pipeline.eventLoop.makeSucceededFuture(()) + return pipeline.eventLoop.makeSucceededIsolatedFuture(()) } - let removeFutures = self.httpHandlers.map { pipeline.removeHandler($0) } - return .andAllSucceed(removeFutures, on: pipeline.eventLoop) + let removeFutures = self.httpHandlers.map { pipeline.syncOperations.removeHandler($0) } + return EventLoopFuture.andAllSucceed(removeFutures, on: pipeline.eventLoop).assumeIsolated() } private func gotUpgrader(upgrader: @escaping (() -> Void)) { diff --git a/Sources/NIOHTTP1/NIOTypedHTTPClientUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPClientUpgradeHandler.swift index 29764820a3..b13723b5fc 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPClientUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPClientUpgradeHandler.swift @@ -17,7 +17,8 @@ import NIOCore /// An object that implements `NIOTypedHTTPClientProtocolUpgrader` knows how to handle HTTP upgrade to /// a protocol on a client-side channel. /// It has the option of denying this upgrade based upon the server response. -public protocol NIOTypedHTTPClientProtocolUpgrader { +@preconcurrency +public protocol NIOTypedHTTPClientProtocolUpgrader: Sendable { associatedtype UpgradeResult: Sendable /// The protocol this upgrader knows how to support. @@ -42,7 +43,7 @@ public protocol NIOTypedHTTPClientProtocolUpgrader { /// The upgrade configuration for the ``NIOTypedHTTPClientUpgradeHandler``. @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public struct NIOTypedHTTPClientUpgradeConfiguration { +public struct NIOTypedHTTPClientUpgradeConfiguration: Sendable { /// The initial request head that is sent out once the channel becomes active. public var upgradeRequestHead: HTTPRequestHead @@ -191,18 +192,17 @@ public final class NIOTypedHTTPClientUpgradeHandler: Ch } private func channelRead(context: ChannelHandlerContext, responsePart: HTTPClientResponsePart) { - let loopBoundContext = context.loopBound switch self.stateMachine.channelReadResponsePart(responsePart) { case .fireErrorCaughtAndRemoveHandler(let error): self.upgradeResultPromise.fail(error) context.fireErrorCaught(error) - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) case .runNotUpgradingInitializer: self.notUpgradingCompletionHandler(context.channel) .hop(to: context.eventLoop) + .assumeIsolated() .whenComplete { result in - let context = loopBoundContext.value self.upgradingHandlerCompleted(context: context, result) } @@ -226,13 +226,12 @@ public final class NIOTypedHTTPClientUpgradeHandler: Ch // Before we start the upgrade we have to remove the HTTPEncoder and HTTPDecoder handlers from the // pipeline, to prevent them parsing any more data. We'll buffer the incoming data until that completes. let channel = context.channel - let loopBoundContext = context.loopBound self.removeHTTPHandlers(pipeline: context.pipeline) .flatMap { upgrader.upgrade(channel: channel, upgradeResponse: responseHead) }.hop(to: context.eventLoop) + .assumeIsolated() .whenComplete { result in - let context = loopBoundContext.value self.upgradingHandlerCompleted(context: context, result) } } @@ -245,7 +244,7 @@ public final class NIOTypedHTTPClientUpgradeHandler: Ch case .fireErrorCaughtAndRemoveHandler(let error): self.upgradeResultPromise.fail(error) context.fireErrorCaught(error) - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) case .fireErrorCaughtAndStartUnbuffering(let error): self.upgradeResultPromise.fail(error) @@ -258,7 +257,7 @@ public final class NIOTypedHTTPClientUpgradeHandler: Ch case .removeHandler(let value): self.upgradeResultPromise.succeed(value) - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) case .none: break @@ -273,7 +272,7 @@ public final class NIOTypedHTTPClientUpgradeHandler: Ch case .fireChannelReadCompleteAndRemoveHandler: context.fireChannelReadComplete() - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) return } } @@ -285,8 +284,12 @@ public final class NIOTypedHTTPClientUpgradeHandler: Ch return pipeline.eventLoop.makeSucceededFuture(()) } - let removeFutures = self.httpHandlers.map { pipeline.removeHandler($0) } + let removeFutures = self.httpHandlers.map { pipeline.syncOperations.removeHandler($0) } return .andAllSucceed(removeFutures, on: pipeline.eventLoop) } } + +@available(*, unavailable) +extension NIOTypedHTTPClientUpgradeHandler: Sendable {} + #endif diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift index 77fb961b0d..3481d752c5 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift @@ -16,7 +16,8 @@ import NIOCore /// An object that implements `NIOTypedHTTPServerProtocolUpgrader` knows how to handle HTTP upgrade to /// a protocol on a server-side channel. -public protocol NIOTypedHTTPServerProtocolUpgrader { +@preconcurrency +public protocol NIOTypedHTTPServerProtocolUpgrader: Sendable { associatedtype UpgradeResult: Sendable /// The protocol this upgrader knows how to support. @@ -47,7 +48,7 @@ public protocol NIOTypedHTTPServerProtocolUpgrader { /// The upgrade configuration for the ``NIOTypedHTTPServerUpgradeHandler``. @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public struct NIOTypedHTTPServerUpgradeConfiguration { +public struct NIOTypedHTTPServerUpgradeConfiguration: Sendable { /// The array of potential upgraders. public var upgraders: [any NIOTypedHTTPServerProtocolUpgrader] @@ -174,7 +175,6 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch } private func channelRead(context: ChannelHandlerContext, requestPart: HTTPServerRequestPart) { - let loopBoundContext = context.loopBound switch self.stateMachine.channelReadRequestPart(requestPart) { case .failUpgradePromise(let error): self.upgradeResultPromise.fail(error) @@ -182,8 +182,8 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch case .runNotUpgradingInitializer: self.notUpgradingCompletionHandler(context.channel) .hop(to: context.eventLoop) + .assumeIsolated() .whenComplete { result in - let context = loopBoundContext.value self.upgradingHandlerCompleted(context: context, result, requestHeadAndProtocol: nil) } @@ -196,8 +196,6 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch allHeaderNames: allHeaderNames, connectionHeader: connectionHeader ).whenComplete { result in - let context = loopBoundContext.value - context.eventLoop.assertInEventLoop() self.findingUpgradeCompleted(context: context, requestHead: head, result) } @@ -220,11 +218,12 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch _ result: Result, requestHeadAndProtocol: (HTTPRequestHead, String)? ) { + context.eventLoop.assertInEventLoop() switch self.stateMachine.upgradingHandlerCompleted(result) { case .fireErrorCaughtAndRemoveHandler(let error): self.upgradeResultPromise.fail(error) context.fireErrorCaught(error) - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) case .fireErrorCaughtAndStartUnbuffering(let error): self.upgradeResultPromise.fail(error) @@ -253,7 +252,7 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch ) } self.upgradeResultPromise.succeed(value) - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) case .none: break @@ -269,14 +268,20 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch request: HTTPRequestHead, allHeaderNames: Set, connectionHeader: Set - ) -> EventLoopFuture< - (upgrader: any NIOTypedHTTPServerProtocolUpgrader, responseHeaders: HTTPHeaders, proto: String)? - > { + ) + -> EventLoopFuture< + ( + upgrader: any NIOTypedHTTPServerProtocolUpgrader, + responseHeaders: HTTPHeaders, + proto: String + )? + >.Isolated + { // We want a local copy of the protocol iterator. We'll pass it to the next invocation of the function. var protocolIterator = protocolIterator guard let proto = protocolIterator.next() else { // We're done! No suitable protocol for upgrade. - return context.eventLoop.makeSucceededFuture(nil) + return context.eventLoop.makeSucceededIsolatedFuture(nil) } guard let upgrader = self.upgraders[proto.lowercased()] else { @@ -300,7 +305,6 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch ) } - let loopBoundContext = context.loopBound let responseHeaders = self.buildUpgradeHeaders(protocol: proto) return upgrader.buildUpgradeResponse( channel: context.channel, @@ -308,10 +312,10 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch initialResponseHeaders: responseHeaders ) .hop(to: context.eventLoop) + .assumeIsolated() .map { (upgrader, $0, proto) } .flatMapError { error in // No upgrade here. We want to fire the error down the pipeline, and then try another loop iteration. - let context = loopBoundContext.value context.fireErrorCaught(error) return self.handleUpgradeForProtocol( context: context, @@ -319,7 +323,7 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch request: request, allHeaderNames: allHeaderNames, connectionHeader: connectionHeader - ) + ).nonisolated() } } @@ -328,9 +332,11 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch requestHead: HTTPRequestHead, _ result: Result< ( - upgrader: any NIOTypedHTTPServerProtocolUpgrader, responseHeaders: HTTPHeaders, + upgrader: any NIOTypedHTTPServerProtocolUpgrader, + responseHeaders: HTTPHeaders, proto: String - )?, Error + )?, + Error > ) { switch self.stateMachine.findingUpgraderCompleted(requestHead: requestHead, result) { @@ -344,11 +350,10 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch ) case .runNotUpgradingInitializer: - let loopBoundContext = context.loopBound self.notUpgradingCompletionHandler(context.channel) .hop(to: context.eventLoop) + .assumeIsolated() .whenComplete { result in - let context = loopBoundContext.value self.upgradingHandlerCompleted(context: context, result, requestHeadAndProtocol: nil) } @@ -360,7 +365,7 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch case .fireErrorCaughtAndRemoveHandler(let error): self.upgradeResultPromise.fail(error) context.fireErrorCaught(error) - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) case .none: break @@ -385,17 +390,17 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch // then remove ourselves from the pipeline. let channel = context.channel let pipeline = context.pipeline - let loopBoundContext = context.loopBound - self.removeExtraHandlers(pipeline: pipeline).flatMap { - let context = loopBoundContext.value - return self.sendUpgradeResponse(context: context, responseHeaders: responseHeaders) - }.flatMap { - pipeline.syncOperations.removeHandler(self.httpEncoder) - }.flatMap { () -> EventLoopFuture in - upgrader.upgrade(channel: channel, upgradeRequest: requestHead) - }.hop(to: context.eventLoop) + + self.removeExtraHandlers(pipeline: pipeline) + .assumeIsolated() + .flatMap { + self.sendUpgradeResponse(context: context, responseHeaders: responseHeaders) + }.flatMap { + pipeline.syncOperations.removeHandler(self.httpEncoder) + }.flatMap { () -> EventLoopFuture in + upgrader.upgrade(channel: channel, upgradeRequest: requestHead) + } .whenComplete { result in - let context = loopBoundContext.value self.upgradingHandlerCompleted(context: context, result, requestHeadAndProtocol: (requestHead, proto)) } } @@ -422,7 +427,7 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch } return .andAllSucceed( - self.extraHTTPHandlers.map { pipeline.removeHandler($0) }, + self.extraHTTPHandlers.map { pipeline.syncOperations.removeHandler($0) }, on: pipeline.eventLoop ) } @@ -438,7 +443,7 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch case .fireChannelReadCompleteAndRemoveHandler: context.fireChannelReadComplete() - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) return case .fireInputClosedEvent: @@ -447,4 +452,8 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch } } } + +@available(*, unavailable) +extension NIOTypedHTTPServerUpgradeHandler: Sendable {} + #endif diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift index ff228b9834..f12eba5882 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift @@ -313,9 +313,11 @@ struct NIOTypedHTTPServerUpgraderStateMachine { requestHead: HTTPRequestHead, _ result: Result< ( - upgrader: any NIOTypedHTTPServerProtocolUpgrader, responseHeaders: HTTPHeaders, + upgrader: any NIOTypedHTTPServerProtocolUpgrader, + responseHeaders: HTTPHeaders, proto: String - )?, Error + )?, + Error > ) -> FindingUpgraderCompletedAction? { switch self.state { diff --git a/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift index 97d9df956a..402fc3de2f 100644 --- a/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift @@ -325,7 +325,7 @@ class HTTPClientUpgradeTestCase: XCTestCase { let channel = EmbeddedChannel() - let config: NIOHTTPClientUpgradeConfiguration = ( + let config: NIOHTTPClientUpgradeSendableConfiguration = ( upgraders: clientUpgraders, completionHandler: { context in channel.pipeline.removeHandler(clientHTTPHandler, promise: nil) diff --git a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift index 11e8f848e0..1122ad61d6 100644 --- a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift +++ b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift @@ -61,7 +61,7 @@ private func setUpClientChannel( let channel = EmbeddedChannel() - let config: NIOHTTPClientUpgradeConfiguration = ( + let config: NIOHTTPClientUpgradeSendableConfiguration = ( upgraders: clientUpgraders, completionHandler: { context in channel.pipeline.removeHandler(clientHTTPHandler, promise: nil)