Skip to content

Commit

Permalink
Add async version of NIOThreadPool.runIfActive (#2566)
Browse files Browse the repository at this point in the history
* Add async version of NIOThreadPool.runIfActive

* Changes from comments in PR

Remove no longer supported code
Add tests for errors being thrown, and the thread pool not being active

* Collapse async runIfActive into one function

* remove whitespace

* Make T Sendable

---------

Co-authored-by: Cory Benfield <lukasa@apple.com>
  • Loading branch information
adam-fowler and Lukasa authored Oct 25, 2023
1 parent 54c85cb commit 95a4eaa
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 0 deletions.
23 changes: 23 additions & 0 deletions Sources/NIOPosix/NIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,29 @@ extension NIOThreadPool {
}
return promise.futureResult
}

/// Runs the submitted closure if the thread pool is still active, otherwise throw an error.
/// The closure will be run on the thread pool so can do blocking work.
///
/// - parameters:
/// - body: The closure which performs some blocking work to be done on the thread pool.
/// - returns: result of the passed closure.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public func runIfActive<T: Sendable>(_ body: @escaping @Sendable () throws -> T) async throws -> T {
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<T, Error>) in
self.submit { shouldRun in
guard case shouldRun = NIOThreadPool.WorkItemState.active else {
cont.resume(throwing: NIOThreadPoolError.ThreadPoolInactive())
return
}
do {
try cont.resume(returning: body())
} catch {
cont.resume(throwing: error)
}
}
}
}
}

extension NIOThreadPool {
Expand Down
49 changes: 49 additions & 0 deletions Tests/NIOPosixTests/NIOThreadPoolTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import XCTest
@testable import NIOPosix
import Atomics
import Dispatch
import NIOConcurrencyHelpers
import NIOEmbedded
Expand Down Expand Up @@ -110,6 +111,54 @@ class NIOThreadPoolTest: XCTestCase {
}
}

func testAsyncThreadPool() async throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
let numberOfThreads = 1
let pool = NIOThreadPool(numberOfThreads: numberOfThreads)
pool.start()
do {
let hitCount = ManagedAtomic(false)
try await pool.runIfActive {
hitCount.store(true, ordering: .relaxed)
}
XCTAssertEqual(hitCount.load(ordering: .relaxed), true)
} catch {}
try await pool.shutdownGracefully()
}

func testAsyncThreadPoolErrorPropagation() async throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
struct ThreadPoolError: Error {}
let numberOfThreads = 1
let pool = NIOThreadPool(numberOfThreads: numberOfThreads)
pool.start()
do {
try await pool.runIfActive {
throw ThreadPoolError()
}
XCTFail("Should not get here as closure sent to runIfActive threw an error")
} catch {
XCTAssertNotNil(error as? ThreadPoolError, "Error thrown should be of type ThreadPoolError")
}
try await pool.shutdownGracefully()
}

func testAsyncThreadPoolNotActiveError() async throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
struct ThreadPoolError: Error {}
let numberOfThreads = 1
let pool = NIOThreadPool(numberOfThreads: numberOfThreads)
do {
try await pool.runIfActive {
throw ThreadPoolError()
}
XCTFail("Should not get here as thread pool isn't active")
} catch {
XCTAssertNotNil(error as? NIOThreadPoolError.ThreadPoolInactive, "Error thrown should be of type ThreadPoolError")
}
try await pool.shutdownGracefully()
}

func testAsyncShutdownWorks() async throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
let threadPool = NIOThreadPool(numberOfThreads: 17)
Expand Down

0 comments on commit 95a4eaa

Please sign in to comment.