Skip to content

Commit

Permalink
Add async version of NIOThreadPool.runIfActive
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler committed Oct 24, 2023
1 parent 86d05fb commit f81b807
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
42 changes: 42 additions & 0 deletions Sources/NIOPosix/NIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,48 @@ extension NIOThreadPool {
}
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOThreadPool {
#if swift(>=5.7)
/// Runs the submitted closure if the thread pool is still active, otherwise throw an error.
/// The closure will be run on the thread pool so can do blocking work.
///
/// - parameters:
/// - body: The closure which performs some blocking work to be done on the thread pool.
/// - returns: result of the passed closure.
@preconcurrency
public func runIfActive<T>(_ body: @escaping @Sendable () throws -> T) async throws -> T {
try await self._runIfActive(body)
}
#else
/// Runs the submitted closure if the thread pool is still active, otherwise throw an error.
/// The closure will be run on the thread pool so can do blocking work.
///
/// - parameters:
/// - body: The closure which performs some blocking work to be done on the thread pool.
/// - returns: result of the passed closure.
public func runIfActive<T>(_ body: @escaping () throws -> T) async throws -> T {
try await self._runIfActive(body)
}
#endif

private func _runIfActive<T>(_ body: @escaping () throws -> T) async throws -> T {
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<T, Error>) in
self.submit { shouldRun in
guard case shouldRun = NIOThreadPool.WorkItemState.active else {
cont.resume(throwing: NIOThreadPoolError.ThreadPoolInactive())
return
}
do {
try cont.resume(returning: body())
} catch {
cont.resume(throwing: error)
}
}
}
}
}

extension NIOThreadPool {
@preconcurrency
public func shutdownGracefully(_ callback: @escaping @Sendable (Error?) -> Void) {
Expand Down
16 changes: 16 additions & 0 deletions Tests/NIOPosixTests/NIOThreadPoolTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import XCTest
@testable import NIOPosix
import Atomics
import Dispatch
import NIOConcurrencyHelpers
import NIOEmbedded
Expand Down Expand Up @@ -110,6 +111,21 @@ class NIOThreadPoolTest: XCTestCase {
}
}

func testAsyncThreadPool() async throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
let numberOfThreads = 1
let pool = NIOThreadPool(numberOfThreads: numberOfThreads)
pool.start()
do {
let hitCount = ManagedAtomic(false)
try await pool.runIfActive {
hitCount.store(true, ordering: .relaxed)
}
XCTAssertEqual(hitCount.load(ordering: .relaxed), true)
} catch {}
try await pool.shutdownGracefully()
}

func testAsyncShutdownWorks() async throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
let threadPool = NIOThreadPool(numberOfThreads: 17)
Expand Down

0 comments on commit f81b807

Please sign in to comment.