diff --git a/src/operators/switchMapTo.ts b/src/operators/switchMapTo.ts index 51ec2d977c..5c143bd114 100644 --- a/src/operators/switchMapTo.ts +++ b/src/operators/switchMapTo.ts @@ -1,9 +1,12 @@ import Operator from '../Operator'; +import Observer from '../Observer'; import Observable from '../Observable'; import Subscriber from '../Subscriber'; import Subscription from '../Subscription'; - -import { MergeMapToSubscriber } from './mergeMapTo-support'; +import tryCatch from '../util/tryCatch'; +import { errorObject } from '../util/errorObject'; +import OuterSubscriber from '../OuterSubscriber'; +import subscribeToResult from '../util/subscribeToResult'; export default function switchMapTo(observable: Observable, projectResult?: (outerValue: T, @@ -26,13 +29,62 @@ class SwitchMapToOperator implements Operator { } } -class SwitchMapToSubscriber extends MergeMapToSubscriber { - constructor(destination: Subscriber, - observable: Observable, - resultSelector?: (outerValue: T, - innerValue: R, - outerIndex: number, - innerIndex: number) => R2) { - super(destination, observable, resultSelector, 1); +class SwitchMapToSubscriber extends OuterSubscriber { + private innerSubscription: Subscription; + private hasCompleted = false; + index: number = 0; + + constructor(destination: Observer, + private inner: Observable, + private resultSelector?: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => R2) { + super(destination); + } + + _next(value: any) { + const index = this.index++; + const innerSubscription = this.innerSubscription; + if (innerSubscription) { + innerSubscription.unsubscribe(); + } + this.add(this.innerSubscription = subscribeToResult(this, this.inner, value, index)); + } + + _complete() { + const innerSubscription = this.innerSubscription; + this.hasCompleted = true; + if (!innerSubscription || innerSubscription.isUnsubscribed) { + this.destination.complete(); + } + } + + notifyComplete(innerSub: Subscription) { + this.remove(innerSub); + const prevSubscription = this.innerSubscription; + if (prevSubscription) { + prevSubscription.unsubscribe(); + } + this.innerSubscription = null; + + if (this.hasCompleted) { + this.destination.complete(); + } + } + + notifyError(err: any) { + this.destination.error(err); + } + + notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) { + const { resultSelector, destination } = this; + if (resultSelector) { + const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex); + if (result === errorObject) { + destination.error(errorObject.e); + } else { + destination.next(result); + } + } else { + destination.next(innerValue); + } } }