Skip to content

Commit

Permalink
Fix some strict concurrency warnings (apple#310)
Browse files Browse the repository at this point in the history
# Motivation

There were a few new strict concurrency warnings that this PR fixes.
  • Loading branch information
FranzBusch authored Apr 4, 2024
1 parent d162617 commit 46b4464
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ let package = Package(
.library(name: "AsyncAlgorithms", targets: ["AsyncAlgorithms"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.4"),
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.0"),
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
],
targets: [
Expand Down
17 changes: 10 additions & 7 deletions Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
typealias SuspendedProducer = UnsafeContinuation<Void, Never>
typealias SuspendedConsumer = UnsafeContinuation<Result<Base.Element, Error>?, Never>

// We are using UnsafeTransfer here since we have to get the elements from the task
// into the consumer task. This is a transfer but we cannot prove this to the compiler at this point
// since next is not marked as transferring the return value.
fileprivate enum State {
case initial(base: Base)
case buffering(
task: Task<Void, Never>,
buffer: Deque<Result<Element, Error>>,
buffer: Deque<Result<UnsafeTransfer<Element>, Error>>,
suspendedProducer: SuspendedProducer?,
suspendedConsumer: SuspendedConsumer?
)
case modifying
case finished(buffer: Deque<Result<Element, Error>>)
case finished(buffer: Deque<Result<UnsafeTransfer<Element>, Error>>)
}

private var state: State
Expand Down Expand Up @@ -139,7 +142,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
// we have to stack the new element or suspend the producer if the buffer is full
precondition(buffer.count < limit, "Invalid state. The buffer should be available for stacking a new element.")
self.state = .modifying
buffer.append(.success(element))
buffer.append(.success(.init(element)))
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil)
return .none

Expand Down Expand Up @@ -218,7 +221,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
self.state = .modifying
let result = buffer.popFirst()!
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil)
return .returnResult(producerContinuation: suspendedProducer, result: result)
return .returnResult(producerContinuation: suspendedProducer, result: result.map { $0.wrapped })

case .buffering(_, _, _, .some):
preconditionFailure("Invalid states. There is already a suspended consumer.")
Expand All @@ -233,7 +236,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
self.state = .modifying
let result = buffer.popFirst()!
self.state = .finished(buffer: buffer)
return .returnResult(producerContinuation: nil, result: result)
return .returnResult(producerContinuation: nil, result: result.map { $0.wrapped })
}
}

Expand All @@ -257,7 +260,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
self.state = .modifying
let result = buffer.popFirst()!
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil)
return .returnResult(producerContinuation: suspendedProducer, result: result)
return .returnResult(producerContinuation: suspendedProducer, result: result.map { $0.wrapped })

case .buffering(_, _, _, .some):
preconditionFailure("Invalid states. There is already a suspended consumer.")
Expand All @@ -272,7 +275,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
self.state = .modifying
let result = buffer.popFirst()!
self.state = .finished(buffer: buffer)
return .returnResult(producerContinuation: nil, result: result)
return .returnResult(producerContinuation: nil, result: result.map { $0.wrapped })
}
}

Expand Down
23 changes: 14 additions & 9 deletions Sources/AsyncAlgorithms/Buffer/UnboundedBufferStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
case bufferingOldest(Int)
}

// We are using UnsafeTransfer here since we have to get the elements from the task
// into the consumer task. This is a transfer but we cannot prove this to the compiler at this point
// since next is not marked as transferring the return value.
fileprivate enum State {
case initial(base: Base)
case buffering(
task: Task<Void, Never>,
buffer: Deque<Result<Element, Error>>,
buffer: Deque<Result<UnsafeTransfer<Element>, Error>>,
suspendedConsumer: SuspendedConsumer?
)
case modifying
case finished(buffer: Deque<Result<Element, Error>>)
case finished(buffer: Deque<Result<UnsafeTransfer<Element>, Error>>)
}

private var state: State
Expand Down Expand Up @@ -84,15 +87,15 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
self.state = .modifying
switch self.policy {
case .unlimited:
buffer.append(.success(element))
buffer.append(.success(.init(element)))
case .bufferingNewest(let limit):
if buffer.count >= limit {
_ = buffer.popFirst()
}
buffer.append(.success(element))
buffer.append(.success(.init(element)))
case .bufferingOldest(let limit):
if buffer.count < limit {
buffer.append(.success(element))
buffer.append(.success(.init(element)))
}
}
self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil)
Expand Down Expand Up @@ -170,7 +173,7 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
self.state = .modifying
let result = buffer.popFirst()!
self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil)
return .returnResult(result)
return .returnResult(result.map { $0.wrapped })

case .modifying:
preconditionFailure("Invalid state.")
Expand All @@ -182,7 +185,7 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
self.state = .modifying
let result = buffer.popFirst()!
self.state = .finished(buffer: buffer)
return .returnResult(result)
return .returnResult(result.map { $0.wrapped })
}
}

Expand All @@ -208,7 +211,7 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
self.state = .modifying
let result = buffer.popFirst()!
self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil)
return .resumeConsumer(result)
return .resumeConsumer(result.map { $0.wrapped })

case .modifying:
preconditionFailure("Invalid state.")
Expand All @@ -220,7 +223,7 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
self.state = .modifying
let result = buffer.popFirst()!
self.state = .finished(buffer: buffer)
return .resumeConsumer(result)
return .resumeConsumer(result.map { $0.wrapped })
}
}

Expand Down Expand Up @@ -251,3 +254,5 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {

extension UnboundedBufferStateMachine: Sendable where Base: Sendable { }
extension UnboundedBufferStateMachine.State: Sendable where Base: Sendable { }


3 changes: 0 additions & 3 deletions Sources/AsyncAlgorithms/Channels/ChannelStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
//===----------------------------------------------------------------------===//
import OrderedCollections

// NOTE: this is only marked as unchecked since the swift-collections tag is before auditing for Sendable
extension OrderedSet: @unchecked Sendable where Element: Sendable { }

struct ChannelStateMachine<Element: Sendable, Failure: Error>: Sendable {
private struct SuspendedProducer: Hashable, Sendable {
let id: UInt64
Expand Down
19 changes: 19 additions & 0 deletions Sources/AsyncAlgorithms/UnsafeTransfer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2024 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

/// A wrapper struct to unconditionally to transfer an non-Sendable value.
struct UnsafeTransfer<Element>: @unchecked Sendable {
let wrapped: Element

init(_ wrapped: Element) {
self.wrapped = wrapped
}
}

0 comments on commit 46b4464

Please sign in to comment.