From 1df8928f74ab85b265fe3f75d1d37bf4c128fd6f Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Tue, 26 Jan 2016 10:43:24 -0800 Subject: [PATCH] feat(OuterSubscriber): notifyNext passes innersubscriber when next emits - notifyNext() passes inner subscriber as same as notifyError(), notifyComplete() does, supports subscription management closes #1250 --- src/InnerSubscriber.ts | 8 ++++---- src/OuterSubscriber.ts | 4 +++- src/operator/buffer.ts | 5 ++++- src/operator/bufferWhen.ts | 5 ++++- src/operator/combineLatest.ts | 7 +++++-- src/operator/debounce.ts | 5 ++++- src/operator/exhaustMap.ts | 5 ++++- src/operator/expand.ts | 5 ++++- src/operator/mergeMap.ts | 7 +++++-- src/operator/mergeMapTo.ts | 5 ++++- src/operator/mergeScan.ts | 5 ++++- src/operator/race.ts | 7 +++++-- src/operator/retryWhen.ts | 5 ++++- src/operator/sample.ts | 5 ++++- src/operator/skipUntil.ts | 5 ++++- src/operator/switch.ts | 5 ++++- src/operator/switchMap.ts | 5 ++++- src/operator/switchMapTo.ts | 5 ++++- src/operator/takeUntil.ts | 5 ++++- src/operator/throttle.ts | 5 ++++- src/operator/windowToggle.ts | 5 ++++- src/operator/withLatestFrom.ts | 9 ++++++--- src/operator/zip.ts | 5 ++++- 23 files changed, 96 insertions(+), 31 deletions(-) diff --git a/src/InnerSubscriber.ts b/src/InnerSubscriber.ts index 306443eb74..6fe5a1bad6 100644 --- a/src/InnerSubscriber.ts +++ b/src/InnerSubscriber.ts @@ -8,16 +8,16 @@ export class InnerSubscriber extends Subscriber { super(); } - protected _next(value: R) { - this.parent.notifyNext(this.outerValue, value, this.outerIndex, this.index++); + protected _next(value: R): void { + this.parent.notifyNext(this.outerValue, value, this.outerIndex, this.index++, this); } - protected _error(error: any) { + protected _error(error: any): void { this.parent.notifyError(error, this); this.unsubscribe(); } - protected _complete() { + protected _complete(): void { this.parent.notifyComplete(this); this.unsubscribe(); } diff --git a/src/OuterSubscriber.ts b/src/OuterSubscriber.ts index 2bfab5a6df..19ac28710b 100644 --- a/src/OuterSubscriber.ts +++ b/src/OuterSubscriber.ts @@ -2,7 +2,9 @@ import {Subscriber} from './Subscriber'; import {InnerSubscriber} from './InnerSubscriber'; export class OuterSubscriber extends Subscriber { - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this.destination.next(innerValue); } diff --git a/src/operator/buffer.ts b/src/operator/buffer.ts index 5db05eee40..4cd1da9b31 100644 --- a/src/operator/buffer.ts +++ b/src/operator/buffer.ts @@ -3,6 +3,7 @@ import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -44,7 +45,9 @@ class BufferSubscriber extends OuterSubscriber { this.buffer.push(value); } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { const buffer = this.buffer; this.buffer = []; this.destination.next(buffer); diff --git a/src/operator/bufferWhen.ts b/src/operator/bufferWhen.ts index 220869d55d..2494ae9587 100644 --- a/src/operator/bufferWhen.ts +++ b/src/operator/bufferWhen.ts @@ -6,6 +6,7 @@ import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -60,7 +61,9 @@ class BufferWhenSubscriber extends OuterSubscriber { this.subscribing = false; } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this.openBuffer(); } diff --git a/src/operator/combineLatest.ts b/src/operator/combineLatest.ts index 47febdc223..876a2af09f 100644 --- a/src/operator/combineLatest.ts +++ b/src/operator/combineLatest.ts @@ -6,6 +6,7 @@ import {isScheduler} from '../util/isScheduler'; import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -116,9 +117,11 @@ export class CombineLatestSubscriber extends OuterSubscriber { } } - notifyNext(observable: any, value: R, outerIndex: number, innerIndex: number) { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { const values = this.values; - values[outerIndex] = value; + values[outerIndex] = innerValue; const toRespond = this.toRespond; if (toRespond.length > 0) { diff --git a/src/operator/debounce.ts b/src/operator/debounce.ts index 581d3b01e8..8be11663b3 100644 --- a/src/operator/debounce.ts +++ b/src/operator/debounce.ts @@ -7,6 +7,7 @@ import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -68,7 +69,9 @@ class DebounceSubscriber extends OuterSubscriber { this.destination.complete(); } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this.emitValue(); } diff --git a/src/operator/exhaustMap.ts b/src/operator/exhaustMap.ts index 830538437d..0dcf12c3b8 100644 --- a/src/operator/exhaustMap.ts +++ b/src/operator/exhaustMap.ts @@ -4,6 +4,7 @@ import {Subscriber} from '../Subscriber'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -64,7 +65,9 @@ class SwitchFirstMapSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { const { resultSelector, destination } = this; if (resultSelector) { const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex); diff --git a/src/operator/expand.ts b/src/operator/expand.ts index 95d600b12d..8fb6661722 100644 --- a/src/operator/expand.ts +++ b/src/operator/expand.ts @@ -6,6 +6,7 @@ import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {Subscription} from '../Subscription'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -96,7 +97,9 @@ export class ExpandSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this._next(innerValue); } diff --git a/src/operator/mergeMap.ts b/src/operator/mergeMap.ts index 71e1b98209..586be05980 100644 --- a/src/operator/mergeMap.ts +++ b/src/operator/mergeMap.ts @@ -4,6 +4,7 @@ import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; import {subscribeToResult} from '../util/subscribeToResult'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; /** * Returns an Observable that emits items based on applying a function that you supply to each item emitted by the @@ -80,7 +81,9 @@ export class MergeMapSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { if (this.resultSelector) { this._notifyResultSelector(outerValue, innerValue, outerIndex, innerIndex); } else { @@ -109,4 +112,4 @@ export class MergeMapSubscriber extends OuterSubscriber { this.destination.complete(); } } -} \ No newline at end of file +} diff --git a/src/operator/mergeMapTo.ts b/src/operator/mergeMapTo.ts index cdcc584e81..da8d1c14c3 100644 --- a/src/operator/mergeMapTo.ts +++ b/src/operator/mergeMapTo.ts @@ -6,6 +6,7 @@ import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {Subscription} from '../Subscription'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; export function mergeMapTo(observable: Observable, @@ -67,7 +68,9 @@ export class MergeMapToSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { const { resultSelector, destination } = this; if (resultSelector) { const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex); diff --git a/src/operator/mergeScan.ts b/src/operator/mergeScan.ts index 28fb5c969e..5652b4c184 100644 --- a/src/operator/mergeScan.ts +++ b/src/operator/mergeScan.ts @@ -6,6 +6,7 @@ import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {subscribeToResult} from '../util/subscribeToResult'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; export function mergeScan(project: (acc: R, value: T) => Observable, seed: R, @@ -70,7 +71,9 @@ export class MergeScanSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { const { destination } = this; this.acc = innerValue; this.hasValue = true; diff --git a/src/operator/race.ts b/src/operator/race.ts index 05e1cbdb6f..0e62eb5f6e 100644 --- a/src/operator/race.ts +++ b/src/operator/race.ts @@ -5,6 +5,7 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -79,7 +80,9 @@ export class RaceSubscriber extends OuterSubscriber { } } - notifyNext(observable: any, value: R, outerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { if (!this.hasFirst) { this.hasFirst = true; @@ -95,6 +98,6 @@ export class RaceSubscriber extends OuterSubscriber { this.subscriptions = null; } - this.destination.next(value); + this.destination.next(innerValue); } } diff --git a/src/operator/retryWhen.ts b/src/operator/retryWhen.ts index ba99da3e5e..3d08c70ea3 100644 --- a/src/operator/retryWhen.ts +++ b/src/operator/retryWhen.ts @@ -7,6 +7,7 @@ import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -92,7 +93,9 @@ class RetryWhenSubscriber extends OuterSubscriber { this.retries = null; } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { const { errors, retries, retriesSubscription } = this; this.errors = null; diff --git a/src/operator/sample.ts b/src/operator/sample.ts index 00ec3440ef..907599c8fd 100644 --- a/src/operator/sample.ts +++ b/src/operator/sample.ts @@ -3,6 +3,7 @@ import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -43,7 +44,9 @@ class SampleSubscriber extends OuterSubscriber { this.hasValue = true; } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this.emitValue(); } diff --git a/src/operator/skipUntil.ts b/src/operator/skipUntil.ts index ff2a1de32c..9844156c72 100644 --- a/src/operator/skipUntil.ts +++ b/src/operator/skipUntil.ts @@ -3,6 +3,7 @@ import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -53,7 +54,9 @@ class SkipUntilSubscriber extends OuterSubscriber { } } - notifyNext(): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this.hasValue = true; } diff --git a/src/operator/switch.ts b/src/operator/switch.ts index 257d581c23..184dfadbed 100644 --- a/src/operator/switch.ts +++ b/src/operator/switch.ts @@ -2,6 +2,7 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -60,7 +61,9 @@ class SwitchSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: any): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this.destination.next(innerValue); } diff --git a/src/operator/switchMap.ts b/src/operator/switchMap.ts index 7440204054..4b85ab0532 100644 --- a/src/operator/switchMap.ts +++ b/src/operator/switchMap.ts @@ -3,6 +3,7 @@ import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -83,7 +84,9 @@ class SwitchMapSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { if (this.resultSelector) { this._tryNotifyNext(outerValue, innerValue, outerIndex, innerIndex); } else { diff --git a/src/operator/switchMapTo.ts b/src/operator/switchMapTo.ts index 2cdd3d1bac..afc6f5029e 100644 --- a/src/operator/switchMapTo.ts +++ b/src/operator/switchMapTo.ts @@ -5,6 +5,7 @@ import {Subscription} from '../Subscription'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; export function switchMapTo(observable: Observable, @@ -63,7 +64,9 @@ class SwitchMapToSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { const { resultSelector, destination } = this; if (resultSelector) { const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex); diff --git a/src/operator/takeUntil.ts b/src/operator/takeUntil.ts index 7ecf4ca97e..030935f9c0 100644 --- a/src/operator/takeUntil.ts +++ b/src/operator/takeUntil.ts @@ -3,6 +3,7 @@ import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; export function takeUntil(notifier: Observable): Observable { @@ -26,7 +27,9 @@ class TakeUntilSubscriber extends OuterSubscriber { this.add(subscribeToResult(this, notifier)); } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this.complete(); } diff --git a/src/operator/throttle.ts b/src/operator/throttle.ts index af35cd18e2..4a859760a8 100644 --- a/src/operator/throttle.ts +++ b/src/operator/throttle.ts @@ -6,6 +6,7 @@ import {Subscription} from '../Subscription'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; export function throttle(durationSelector: (value: T) => Observable | Promise): Observable { @@ -50,7 +51,9 @@ class ThrottleSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this._unsubscribe(); } diff --git a/src/operator/windowToggle.ts b/src/operator/windowToggle.ts index 41b3852b16..491b22e502 100644 --- a/src/operator/windowToggle.ts +++ b/src/operator/windowToggle.ts @@ -8,6 +8,7 @@ import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; export function windowToggle(openings: Observable, @@ -102,7 +103,9 @@ class WindowToggleSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: any, innerValue: any, outerIndex: number, innerIndex: number): void { + notifyNext(outerValue: any, innerValue: any, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { if (outerValue === this.openings) { diff --git a/src/operator/withLatestFrom.ts b/src/operator/withLatestFrom.ts index cdc5d89d5e..3426e221fe 100644 --- a/src/operator/withLatestFrom.ts +++ b/src/operator/withLatestFrom.ts @@ -2,6 +2,7 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; /** @@ -62,11 +63,13 @@ class WithLatestFromSubscriber extends OuterSubscriber { } } - notifyNext(observable: any, value: any, observableIndex: number, index: number) { - this.values[observableIndex] = value; + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.values[outerIndex] = innerValue; const toRespond = this.toRespond; if (toRespond.length > 0) { - const found = toRespond.indexOf(observableIndex); + const found = toRespond.indexOf(outerIndex); if (found !== -1) { toRespond.splice(found, 1); } diff --git a/src/operator/zip.ts b/src/operator/zip.ts index 6b3c6e0241..425933e5a9 100644 --- a/src/operator/zip.ts +++ b/src/operator/zip.ts @@ -5,6 +5,7 @@ import {Operator} from '../Operator'; import {Observer} from '../Observer'; import {Subscriber} from '../Subscriber'; import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; import {SymbolShim} from '../util/SymbolShim'; @@ -237,7 +238,9 @@ class ZipBufferIterator extends OuterSubscriber implements LookAhead } } - notifyNext(outerValue: any, innerValue: any, outerIndex: number, innerIndex: number) { + notifyNext(outerValue: T, innerValue: any, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { this.buffer.push(innerValue); this.parent.checkIterators(); }