diff --git a/ReactiveSwift.xcodeproj/project.pbxproj b/ReactiveSwift.xcodeproj/project.pbxproj index e870caf2e..cefedd3a8 100644 --- a/ReactiveSwift.xcodeproj/project.pbxproj +++ b/ReactiveSwift.xcodeproj/project.pbxproj @@ -197,6 +197,18 @@ 9A2D5D0E259F8D1F005682ED /* ScanMap.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D0C259F8D1F005682ED /* ScanMap.swift */; }; 9A2D5D0F259F8D1F005682ED /* ScanMap.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D0C259F8D1F005682ED /* ScanMap.swift */; }; 9A2D5D10259F8D1F005682ED /* ScanMap.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D0C259F8D1F005682ED /* ScanMap.swift */; }; + 9A2D5D53259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; }; + 9A2D5D54259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; }; + 9A2D5D55259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; }; + 9A2D5D56259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; }; + 9A2D5D5D259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; }; + 9A2D5D5E259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; }; + 9A2D5D5F259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; }; + 9A2D5D60259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; }; + 9A2D5D67259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; }; + 9A2D5D68259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; }; + 9A2D5D69259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; }; + 9A2D5D6A259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; }; 9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; 9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; 9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; @@ -424,6 +436,9 @@ 9A2D5CF8259F8634005682ED /* UniqueValues.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UniqueValues.swift; sourceTree = ""; }; 9A2D5D02259F8C39005682ED /* Reduce.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Reduce.swift; sourceTree = ""; }; 9A2D5D0C259F8D1F005682ED /* ScanMap.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ScanMap.swift; sourceTree = ""; }; + 9A2D5D52259FA000005682ED /* Throttle.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Throttle.swift; sourceTree = ""; }; + 9A2D5D5C259FA0DD005682ED /* Debounce.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Debounce.swift; sourceTree = ""; }; + 9A2D5D66259FA59E005682ED /* CollectEvery.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CollectEvery.swift; sourceTree = ""; }; 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UninhabitedTypeGuards.swift; sourceTree = ""; }; 9A681A9D1E5A241B00B097CF /* DeprecationSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DeprecationSpec.swift; sourceTree = ""; }; 9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingProperty.swift; sourceTree = ""; }; @@ -571,6 +586,9 @@ 9A2D5C80259F7E3E005682ED /* DematerializeResults.swift */, 9A2D5C8A259F7ED5005682ED /* Dematerialize.swift */, 9A2D5C76259F7D3D005682ED /* AttemptMap.swift */, + 9A2D5D52259FA000005682ED /* Throttle.swift */, + 9A2D5D5C259FA0DD005682ED /* Debounce.swift */, + 9A2D5D66259FA59E005682ED /* CollectEvery.swift */, ); path = Observers; sourceTree = ""; @@ -1029,6 +1047,7 @@ 57A4D1B11BA13D7A00F7D4B1 /* Optional.swift in Sources */, 57A4D1B41BA13D7A00F7D4B1 /* Disposable.swift in Sources */, 57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */, + 9A2D5D6A259FA59E005682ED /* CollectEvery.swift in Sources */, 57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */, 9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */, 9A2D5CF2259F85AE005682ED /* SkipRepeats.swift in Sources */, @@ -1045,6 +1064,9 @@ 9A2D5CE8259F852B005682ED /* CombinePrevious.swift in Sources */, 9A67963E1F6059440058C5B4 /* UninhabitedTypeGuards.swift in Sources */, 9A2D5D06259F8C39005682ED /* Reduce.swift in Sources */, + 9A2D5D56259FA000005682ED /* Throttle.swift in Sources */, + 9A67963E1F6059440058C5B4 /* UninhabitedTypeGuards.swift in Sources */, + 9A2D5D60259FA0DD005682ED /* Debounce.swift in Sources */, 9AFA491424E9A196003D263C /* Map.swift in Sources */, 9A2D5D33259F942B005682ED /* LazyMap.swift in Sources */, 9A2D5CFC259F8634005682ED /* UniqueValues.swift in Sources */, @@ -1111,6 +1133,7 @@ A9F793341B60D0140026BCBA /* Optional.swift in Sources */, A9B315BC1B3940810001CB9C /* Disposable.swift in Sources */, A9B315BE1B3940810001CB9C /* Event.swift in Sources */, + 9A2D5D69259FA59E005682ED /* CollectEvery.swift in Sources */, A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */, 9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */, 9A2D5CF1259F85AE005682ED /* SkipRepeats.swift in Sources */, @@ -1130,6 +1153,9 @@ 9AFA491324E9A196003D263C /* Map.swift in Sources */, 9A2D5CFB259F8634005682ED /* UniqueValues.swift in Sources */, 9A2D5C65259F7B47005682ED /* MaterializeAsResult.swift in Sources */, + 9A2D5D55259FA000005682ED /* Throttle.swift in Sources */, + 9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */, + 9A2D5D5F259FA0DD005682ED /* Debounce.swift in Sources */, 9AFA491324E9A196003D263C /* Map.swift in Sources */, 9A2D5D32259F942B005682ED /* LazyMap.swift in Sources */, 9A2D5C8D259F7ED5005682ED /* Dematerialize.swift in Sources */, @@ -1166,6 +1192,7 @@ D871D69F1B3B29A40070F16C /* Optional.swift in Sources */, D08C54B61A69A3DB00AD8286 /* Event.swift in Sources */, D0C312D319EF2A5800984962 /* Disposable.swift in Sources */, + 9A2D5D67259FA59E005682ED /* CollectEvery.swift in Sources */, 9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */, EBCC7DBC1BBF010C00A2AE92 /* Signal.Observer.swift in Sources */, 9A2D5CEF259F85AE005682ED /* SkipRepeats.swift in Sources */, @@ -1182,6 +1209,9 @@ 9A2D5CE5259F852B005682ED /* CombinePrevious.swift in Sources */, 9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */, 9A2D5D03259F8C39005682ED /* Reduce.swift in Sources */, + 9A2D5D53259FA000005682ED /* Throttle.swift in Sources */, + 9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */, + 9A2D5D5D259FA0DD005682ED /* Debounce.swift in Sources */, 9AFA491124E9A196003D263C /* Map.swift in Sources */, 9A2D5D30259F942B005682ED /* LazyMap.swift in Sources */, 9A2D5CF9259F8634005682ED /* UniqueValues.swift in Sources */, @@ -1248,6 +1278,7 @@ D08C54B41A69A2AF00AD8286 /* Signal.swift in Sources */, D8E84A671B3B32FB00C3E831 /* Optional.swift in Sources */, D0C312D419EF2A5800984962 /* Disposable.swift in Sources */, + 9A2D5D68259FA59E005682ED /* CollectEvery.swift in Sources */, D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */, 9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */, 9A2D5CF0259F85AE005682ED /* SkipRepeats.swift in Sources */, @@ -1267,6 +1298,9 @@ 9AFA491224E9A196003D263C /* Map.swift in Sources */, 9A2D5CFA259F8634005682ED /* UniqueValues.swift in Sources */, 9A2D5C64259F7B47005682ED /* MaterializeAsResult.swift in Sources */, + 9A2D5D54259FA000005682ED /* Throttle.swift in Sources */, + 9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */, + 9A2D5D5E259FA0DD005682ED /* Debounce.swift in Sources */, 9AFA491224E9A196003D263C /* Map.swift in Sources */, 9A2D5D31259F942B005682ED /* LazyMap.swift in Sources */, 9A2D5C8C259F7ED5005682ED /* Dematerialize.swift in Sources */, diff --git a/Sources/Event.swift b/Sources/Event.swift index d3eb339c8..55ca1ea51 100644 --- a/Sources/Event.swift +++ b/Sources/Event.swift @@ -423,170 +423,37 @@ extension Signal.Event { } internal static func throttle(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation { - precondition(interval >= 0) - - return { action, lifetime in - let state: Atomic> = Atomic(ThrottleState()) - let schedulerDisposable = SerialDisposable() - - lifetime.observeEnded { - schedulerDisposable.dispose() - scheduler.schedule { action(.interrupted) } - } - - return Signal.Observer { event in - guard let value = event.value else { - schedulerDisposable.inner = scheduler.schedule { - action(event) - } - return - } - - let scheduleDate: Date = state.modify { state in - state.pendingValue = value - - let proposedScheduleDate: Date - if let previousDate = state.previousDate, previousDate <= scheduler.currentDate { - proposedScheduleDate = previousDate.addingTimeInterval(interval) - } else { - proposedScheduleDate = scheduler.currentDate - } - - return proposedScheduleDate < scheduler.currentDate ? scheduler.currentDate : proposedScheduleDate - } - - schedulerDisposable.inner = scheduler.schedule(after: scheduleDate) { - if let pendingValue = state.modify({ $0.retrieveValue(date: scheduleDate) }) { - action(.value(pendingValue)) - } - } - } + return { downstream, lifetime in + Operators.Throttle(downstream: downstream, downstreamLifetime: lifetime, target: scheduler, interval: interval) } } internal static func debounce(_ interval: TimeInterval, on scheduler: DateScheduler, discardWhenCompleted: Bool) -> Transformation { - precondition(interval >= 0) - - return { action, lifetime in - let state: Atomic> = Atomic(ThrottleState(previousDate: scheduler.currentDate, pendingValue: nil)) - let d = SerialDisposable() - - lifetime.observeEnded { - d.dispose() - scheduler.schedule { action(.interrupted) } - } - - return Signal.Observer { event in - switch event { - case let .value(value): - state.modify { state in - state.pendingValue = value - } - let date = scheduler.currentDate.addingTimeInterval(interval) - d.inner = scheduler.schedule(after: date) { - if let pendingValue = state.modify({ $0.retrieveValue(date: date) }) { - action(.value(pendingValue)) - } - } - - case .completed: - d.inner = scheduler.schedule { - let pending: (value: Value, previousDate: Date)? = state.modify { state in - defer { state.pendingValue = nil } - guard let pendingValue = state.pendingValue, let previousDate = state.previousDate else { return nil } - return (pendingValue, previousDate) - } - if !discardWhenCompleted, let (pendingValue, previousDate) = pending { - scheduler.schedule(after: previousDate.addingTimeInterval(interval)) { - action(.value(pendingValue)) - action(.completed) - } - } else { - action(.completed) - } - } - - case .failed, .interrupted: - d.inner = scheduler.schedule { - action(event) - } - } - } + return { downstream, lifetime in + Operators.Debounce( + downstream: downstream, + downstreamLifetime: lifetime, + target: scheduler, + interval: interval, + discardWhenCompleted: discardWhenCompleted + ) } } internal static func collect(every interval: DispatchTimeInterval, on scheduler: DateScheduler, skipEmpty: Bool, discardWhenCompleted: Bool) -> Transformation<[Value], Error> { - return { action, lifetime in - let state = Atomic>(.init(skipEmpty: skipEmpty)) - let d = SerialDisposable() - - d.inner = scheduler.schedule(after: scheduler.currentDate.addingTimeInterval(interval), interval: interval, leeway: interval * 0.1) { - let (currentValues, isCompleted) = state.modify { ($0.collect(), $0.isCompleted) } - if let currentValues = currentValues { - action(.value(currentValues)) - } - if isCompleted { - action(.completed) - } - } - - lifetime.observeEnded { - d.dispose() - scheduler.schedule { action(.interrupted) } - } - - return Signal.Observer { event in - switch event { - case let .value(value): - state.modify { $0.values.append(value) } - case let .failed(error): - d.inner = scheduler.schedule { action(.failed(error)) } - case .completed where !discardWhenCompleted: - state.modify { $0.isCompleted = true } - case .completed: - d.inner = scheduler.schedule { action(.completed) } - case .interrupted: - d.inner = scheduler.schedule { action(.interrupted) } - } - } + return { downstream, lifetime in + Operators.CollectEvery( + downstream: downstream, + downstreamLifetime: lifetime, + target: scheduler, + interval: interval, + skipEmpty: skipEmpty, + discardWhenCompleted: discardWhenCompleted + ) } } } -private struct CollectEveryState { - let skipEmpty: Bool - var values: [Value] = [] - var isCompleted: Bool = false - - init(skipEmpty: Bool) { - self.skipEmpty = skipEmpty - } - - var hasValues: Bool { - return !values.isEmpty || !skipEmpty - } - - mutating func collect() -> [Value]? { - guard hasValues else { return nil } - defer { values.removeAll() } - return values - } -} - -private struct ThrottleState { - var previousDate: Date? - var pendingValue: Value? - - mutating func retrieveValue(date: Date) -> Value? { - defer { - if pendingValue != nil { - pendingValue = nil - previousDate = date - } - } - return pendingValue - } -} extension Signal.Event where Error == Never { internal static func promoteError(_: F.Type) -> Transformation { diff --git a/Sources/Observers/CollectEvery.swift b/Sources/Observers/CollectEvery.swift new file mode 100644 index 000000000..91d61ea8c --- /dev/null +++ b/Sources/Observers/CollectEvery.swift @@ -0,0 +1,78 @@ +import Dispatch + +extension Operators { + internal final class CollectEvery: UnaryAsyncOperator { + let interval: DispatchTimeInterval + let discardWhenCompleted: Bool + let targetWithClock: DateScheduler + + private let state: Atomic> + private let timerDisposable = SerialDisposable() + + init( + downstream: Observer<[Value], Error>, + downstreamLifetime: Lifetime, + target: DateScheduler, + interval: DispatchTimeInterval, + skipEmpty: Bool, + discardWhenCompleted: Bool + ) { + self.interval = interval + self.discardWhenCompleted = discardWhenCompleted + self.targetWithClock = target + self.state = Atomic(CollectEveryState(skipEmpty: skipEmpty)) + + super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target) + + downstreamLifetime += timerDisposable + + let initialDate = targetWithClock.currentDate.addingTimeInterval(interval) + timerDisposable.inner = targetWithClock.schedule(after: initialDate, interval: interval, leeway: interval * 0.1) { + let (currentValues, isCompleted) = self.state.modify { ($0.collect(), $0.isCompleted) } + + if let currentValues = currentValues { + self.unscheduledSend(currentValues) + } + + if isCompleted { + self.unscheduledTerminate(.completed) + } + } + } + + override func receive(_ value: Value) { + state.modify { $0.values.append(value) } + } + + override func terminate(_ termination: Termination) { + guard isActive else { return } + + if case .completed = termination, !discardWhenCompleted { + state.modify { $0.isCompleted = true } + } else { + timerDisposable.dispose() + super.terminate(termination) + } + } + } +} + +private struct CollectEveryState { + let skipEmpty: Bool + var values: [Value] = [] + var isCompleted: Bool = false + + init(skipEmpty: Bool) { + self.skipEmpty = skipEmpty + } + + var hasValues: Bool { + return !values.isEmpty || !skipEmpty + } + + mutating func collect() -> [Value]? { + guard hasValues else { return nil } + defer { values.removeAll() } + return values + } +} diff --git a/Sources/Observers/Debounce.swift b/Sources/Observers/Debounce.swift new file mode 100644 index 000000000..bfe0ef2a3 --- /dev/null +++ b/Sources/Observers/Debounce.swift @@ -0,0 +1,77 @@ +import Foundation + +extension Operators { + internal final class Debounce: UnaryAsyncOperator { + let interval: TimeInterval + let discardWhenCompleted: Bool + let targetWithClock: DateScheduler + + private let state: Atomic> = Atomic(DebounceState()) + private let schedulerDisposable = SerialDisposable() + + init( + downstream: Observer, + downstreamLifetime: Lifetime, + target: DateScheduler, + interval: TimeInterval, + discardWhenCompleted: Bool + ) { + precondition(interval >= 0) + + self.interval = interval + self.discardWhenCompleted = discardWhenCompleted + self.targetWithClock = target + + super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target) + + downstreamLifetime += schedulerDisposable + } + + override func receive(_ value: Value) { + let now = targetWithClock.currentDate + + state.modify { state in + state.lastUpdated = now + state.pendingValue = value + } + let targetDate = now.addingTimeInterval(interval) + schedulerDisposable.inner = targetWithClock.schedule(after: targetDate) { + if let pendingValue = self.state.modify({ $0.retrieve() }) { + self.unscheduledSend(pendingValue) + } + } + } + + override func terminate(_ termination: Termination) { + guard isActive else { return } + schedulerDisposable.dispose() + + if case .completed = termination { + let pending: (value: Value?, lastUpdated: Date) = state.modify { state in + return (state.retrieve(), state.lastUpdated) + } + + if !discardWhenCompleted, let pendingValue = pending.value { + targetWithClock.schedule(after: pending.lastUpdated.addingTimeInterval(interval)) { + self.unscheduledSend(pendingValue) + super.terminate(.completed) + } + } else { + super.terminate(.completed) + } + } else { + super.terminate(termination) + } + } + } +} + +private struct DebounceState { + var lastUpdated: Date = .distantPast + var pendingValue: Value? + + mutating func retrieve() -> Value? { + defer { pendingValue = nil } + return pendingValue + } +} diff --git a/Sources/Observers/Throttle.swift b/Sources/Observers/Throttle.swift new file mode 100644 index 000000000..a70f4bfdf --- /dev/null +++ b/Sources/Observers/Throttle.swift @@ -0,0 +1,64 @@ +import Foundation + +extension Operators { + internal final class Throttle: UnaryAsyncOperator { + let interval: TimeInterval + let targetWithClock: DateScheduler + + private let state: Atomic> = Atomic(ThrottleState()) + private let schedulerDisposable = SerialDisposable() + + init( + downstream: Observer, + downstreamLifetime: Lifetime, + target: DateScheduler, + interval: TimeInterval + ) { + precondition(interval >= 0) + + self.interval = interval + self.targetWithClock = target + super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target) + + downstreamLifetime += schedulerDisposable + } + + override func receive(_ value: Value) { + let scheduleDate: Date = state.modify { state in + state.pendingValue = value + + let proposedScheduleDate: Date + if let previousDate = state.previousDate, previousDate <= targetWithClock.currentDate { + proposedScheduleDate = previousDate.addingTimeInterval(interval) + } else { + proposedScheduleDate = targetWithClock.currentDate + } + + return proposedScheduleDate < targetWithClock.currentDate ? targetWithClock.currentDate : proposedScheduleDate + } + + schedulerDisposable.inner = targetWithClock.schedule(after: scheduleDate) { + guard self.isActive else { return } + + if let pendingValue = self.state.modify({ $0.retrieveValue(date: scheduleDate) }) { + self.unscheduledSend(pendingValue) + } + } + } + } +} + +private struct ThrottleState { + var previousDate: Date? + var pendingValue: Value? + + mutating func retrieveValue(date: Date) -> Value? { + defer { + if pendingValue != nil { + pendingValue = nil + previousDate = date + } + } + return pendingValue + } +} diff --git a/Sources/Observers/UnaryAsyncOperator.swift b/Sources/Observers/UnaryAsyncOperator.swift index 4f9616b76..6821e5f22 100644 --- a/Sources/Observers/UnaryAsyncOperator.swift +++ b/Sources/Observers/UnaryAsyncOperator.swift @@ -46,16 +46,29 @@ internal class UnaryAsyncOperator: downstream.receive(value) } + /// Signal termination to the downstream without any implicit scheduling on `target`. + /// + /// - important: Subclasses must invoke this only after having hopped onto the target scheduler. + final func unscheduledTerminate(_ termination: Termination) { + if self.state.tryTransition(from: .active, to: .terminated) { + if case .completed = termination { + self.onCompleted() + } + + self.downstream.terminate(termination) + } + } + open override func terminate(_ termination: Termination) { // The atomic transition here must happen **after** we hop onto the target scheduler. This is to preserve the timing // behaviour observed in previous versions of ReactiveSwift. target.schedule { - if self.state.tryTransition(from: .active, to: .terminated) { - self.downstream.terminate(termination) - } + self.unscheduledTerminate(termination) } } + + open func onCompleted() {} } private enum AsyncOperatorState: Int32 {