From d83264d32ec2f32f26905daaff244e0db5985117 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Thu, 6 Sep 2018 23:22:35 +1000 Subject: [PATCH 01/20] test(mergeMap): add failing test --- spec/operators/mergeMap-spec.ts | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index 05ca08168a..5af05847de 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { mergeMap, map } from 'rxjs/operators'; -import { asapScheduler, defer, Observable, from, of } from 'rxjs'; +import { mergeMap, map, mapTo } from 'rxjs/operators'; +import { asapScheduler, concat, defer, Observable, from, of, timer } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; declare const type: Function; @@ -717,7 +717,7 @@ describe('mergeMap', () => { // Added as a failing test when investigating: // https://github.com/ReactiveX/rxjs/issues/4071 - const results: any[] = []; + const results: (number | string)[] = []; of(1).pipe( mergeMap(() => defer(() => @@ -744,7 +744,7 @@ describe('mergeMap', () => { // Added as a failing test when investigating: // https://github.com/ReactiveX/rxjs/issues/4071 - const results: any[] = []; + const results: (number | string)[] = []; of(1).pipe( mergeMap(() => @@ -764,6 +764,30 @@ describe('mergeMap', () => { }, 0); }); + it('should support wrapped sources', (done: MochaDone) => { + + // Added as a failing test when investigating: + // https://github.com/ReactiveX/rxjs/issues/4095 + + const results: (number | string)[] = []; + + const wrapped = new Observable(subscriber => { + const subscription = timer(0, asapScheduler).pipe(mapTo(42)).subscribe(subscriber); + return () => subscription.unsubscribe(); + }); + wrapped.pipe( + mergeMap(value => concat(of(value), timer(0, asapScheduler).pipe(mapTo(value)))) + ).subscribe({ + next(value) { results.push(value); }, + complete() { results.push('done'); } + }); + + setTimeout(() => { + expect(results).to.deep.equal([42, 42, 'done']); + done(); + }, 0); + }); + type('should support type signatures', () => { let o: Observable; From 83414b3ac431e8270f4420492706e5750d87d94e Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Fri, 7 Sep 2018 05:55:34 +1000 Subject: [PATCH 02/20] fix(subscriber): don't unsubscribe self When unsubscribing a subscriber's parent, make sure that the subscriber itself is not unsubscribed. Closes #4095 --- src/internal/Subscriber.ts | 2 ++ src/internal/Subscription.ts | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index 7f81c5814b..0d6681b10e 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -166,7 +166,9 @@ export class Subscriber extends Subscription implements Observer { /** @deprecated This is an internal implementation detail, do not use. */ _unsubscribeParentSubscription() { if (this._parentSubscription !== null) { + ++this._keepAliveCount; this._parentSubscription.unsubscribe(); + --this._keepAliveCount; } } diff --git a/src/internal/Subscription.ts b/src/internal/Subscription.ts index 52380c0317..163012d192 100644 --- a/src/internal/Subscription.ts +++ b/src/internal/Subscription.ts @@ -36,6 +36,8 @@ export class Subscription implements SubscriptionLike { /** @internal */ protected _parents: Subscription[] = null; /** @internal */ + protected _keepAliveCount: number = 0; + /** @internal */ private _subscriptions: SubscriptionLike[] = null; /** @@ -45,7 +47,6 @@ export class Subscription implements SubscriptionLike { constructor(unsubscribe?: () => void) { if (unsubscribe) { ( this)._unsubscribe = unsubscribe; - } } @@ -59,7 +60,7 @@ export class Subscription implements SubscriptionLike { let hasErrors = false; let errors: any[]; - if (this.closed) { + if (this.closed || this._keepAliveCount > 0) { return; } From 80e52eee74121354a3f92ba6343ae61cbbe4c8ef Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Fri, 14 Sep 2018 17:00:41 +1000 Subject: [PATCH 03/20] refactor(mergeMap): simplify --- spec/operators/mergeMap-spec.ts | 2 +- src/internal/Observable.ts | 2 +- src/internal/Subscriber.ts | 17 ++--------------- src/internal/Subscription.ts | 4 +--- src/internal/operators/mergeMap.ts | 4 +++- 5 files changed, 8 insertions(+), 21 deletions(-) diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index 5af05847de..8b80abf4ab 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -7,7 +7,7 @@ declare const type: Function; declare const asDiagram: Function; /** @test {mergeMap} */ -describe('mergeMap', () => { +describe.only('mergeMap', () => { asDiagram('mergeMap(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014| )') ('should map-and-flatten each item to an Observable', () => { const e1 = hot('--1-----3--5-------|'); diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 78b7506190..01669975c5 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -199,7 +199,7 @@ export class Observable implements Subscribable { if (operator) { operator.call(sink, this.source); } else { - sink._addParentTeardownLogic( + sink.add( this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ? this._subscribe(sink) : this._trySubscribe(sink) diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index 0d6681b10e..8f5a0ccf81 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -78,7 +78,7 @@ export class Subscriber extends Subscription implements Observer { const trustedSubscriber = destinationOrNext[rxSubscriberSymbol]() as Subscriber; this.syncErrorThrowable = trustedSubscriber.syncErrorThrowable; this.destination = trustedSubscriber; - trustedSubscriber._addParentTeardownLogic(this); + trustedSubscriber.add(this); } else { this.syncErrorThrowable = true; this.destination = new SafeSubscriber(this, > destinationOrNext); @@ -116,7 +116,6 @@ export class Subscriber extends Subscription implements Observer { if (!this.isStopped) { this.isStopped = true; this._error(err); - this._unsubscribeParentSubscription(); } } @@ -130,7 +129,6 @@ export class Subscriber extends Subscription implements Observer { if (!this.isStopped) { this.isStopped = true; this._complete(); - this._unsubscribeParentSubscription(); } } @@ -158,18 +156,7 @@ export class Subscriber extends Subscription implements Observer { /** @deprecated This is an internal implementation detail, do not use. */ _addParentTeardownLogic(parentTeardownLogic: TeardownLogic) { - if (parentTeardownLogic !== this) { - this._parentSubscription = this.add(parentTeardownLogic); - } - } - - /** @deprecated This is an internal implementation detail, do not use. */ - _unsubscribeParentSubscription() { - if (this._parentSubscription !== null) { - ++this._keepAliveCount; - this._parentSubscription.unsubscribe(); - --this._keepAliveCount; - } + /*noop*/ } /** @deprecated This is an internal implementation detail, do not use. */ diff --git a/src/internal/Subscription.ts b/src/internal/Subscription.ts index 163012d192..3faac56f87 100644 --- a/src/internal/Subscription.ts +++ b/src/internal/Subscription.ts @@ -36,8 +36,6 @@ export class Subscription implements SubscriptionLike { /** @internal */ protected _parents: Subscription[] = null; /** @internal */ - protected _keepAliveCount: number = 0; - /** @internal */ private _subscriptions: SubscriptionLike[] = null; /** @@ -60,7 +58,7 @@ export class Subscription implements SubscriptionLike { let hasErrors = false; let errors: any[]; - if (this.closed || this._keepAliveCount > 0) { + if (this.closed) { return; } diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 3a2b9bc950..d45d6baf49 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -140,7 +140,8 @@ export class MergeMapSubscriber extends OuterSubscriber { private _innerSub(ish: ObservableInput, value: T, index: number): void { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as any as Subscription; + destination.add(innerSubscriber); subscribeToResult(this, ish, value, index, innerSubscriber); } @@ -149,6 +150,7 @@ export class MergeMapSubscriber extends OuterSubscriber { if (this.active === 0 && this.buffer.length === 0) { this.destination.complete(); } + this.unsubscribe(); } notifyNext(outerValue: T, innerValue: R, From bc75ee2b08dfe4e8cd408f523239b5a16fbb512e Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 05:37:43 +1000 Subject: [PATCH 04/20] chore(typings): use union type for destination --- src/internal/Subscriber.ts | 2 +- src/internal/operators/mergeMap.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index 8f5a0ccf81..6008ffcf4f 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -45,7 +45,7 @@ export class Subscriber extends Subscription implements Observer { /** @internal */ syncErrorThrowable: boolean = false; protected isStopped: boolean = false; - protected destination: PartialObserver; // this `any` is the escape hatch to erase extra type param (e.g. R) + protected destination: PartialObserver | Subscriber; // this `any` is the escape hatch to erase extra type param (e.g. R) private _parentSubscription: Subscription | null = null; diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index d45d6baf49..d843dcdc97 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -140,7 +140,7 @@ export class MergeMapSubscriber extends OuterSubscriber { private _innerSub(ish: ObservableInput, value: T, index: number): void { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - const destination = this.destination as any as Subscription; + const destination = this.destination as Subscription; destination.add(innerSubscriber); subscribeToResult(this, ish, value, index, innerSubscriber); } From 55e60d67f58880ff2a3ba7126b6c41b45d2985ea Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 06:26:47 +1000 Subject: [PATCH 05/20] chore(test): remove only --- spec/operators/mergeMap-spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index 8b80abf4ab..5af05847de 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -7,7 +7,7 @@ declare const type: Function; declare const asDiagram: Function; /** @test {mergeMap} */ -describe.only('mergeMap', () => { +describe('mergeMap', () => { asDiagram('mergeMap(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014| )') ('should map-and-flatten each item to an Observable', () => { const e1 = hot('--1-----3--5-------|'); From c2395e62cdc166f0ce25aeb36d74cff14c6f520a Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 06:51:53 +1000 Subject: [PATCH 06/20] chore(test): use pipe --- spec/operators/switch-spec.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spec/operators/switch-spec.ts b/spec/operators/switch-spec.ts index 6c1a9cb72b..0e0b391c7a 100644 --- a/spec/operators/switch-spec.ts +++ b/spec/operators/switch-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { Observable, of, NEVER, queueScheduler, Subject } from 'rxjs'; -import { switchAll } from 'rxjs/operators'; +import { map, switchAll } from 'rxjs/operators'; declare function asDiagram(arg: string): Function; declare const type: Function; @@ -222,9 +222,9 @@ describe('switchAll', () => { it('should not leak when child completes before each switch (prevent memory leaks #2355)', () => { let iStream: Subject; const oStreamControl = new Subject(); - const oStream = oStreamControl.map(() => { - return (iStream = new Subject()); - }); + const oStream = oStreamControl.pipe( + map(() => (iStream = new Subject())) + ); const switcher = oStream.pipe(switchAll()); const result: number[] = []; let sub = switcher.subscribe((x) => result.push(x)); @@ -242,9 +242,9 @@ describe('switchAll', () => { it('should not leak if we switch before child completes (prevent memory leaks #2355)', () => { const oStreamControl = new Subject(); - const oStream = oStreamControl.map(() => { - return (new Subject()); - }); + const oStream = oStreamControl.pipe( + map(() => new Subject()) + ); const switcher = oStream.pipe(switchAll()); const result: number[] = []; let sub = switcher.subscribe((x) => result.push(x)); From 1d02c73fbf802e6aba8740f29bac2f9cca0d4a1b Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:53:07 +1000 Subject: [PATCH 07/20] test(Subscriber): fake add too --- spec/Subscriber-spec.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spec/Subscriber-spec.ts b/spec/Subscriber-spec.ts index bdac644988..a78959f39c 100644 --- a/spec/Subscriber-spec.ts +++ b/spec/Subscriber-spec.ts @@ -23,7 +23,8 @@ describe('Subscriber', () => { it('should accept subscribers as a destination if they meet the proper criteria', () => { const fakeSubscriber = { [rxSubscriber](this: any) { return this; }, - _addParentTeardownLogic() { /* noop */ } + _addParentTeardownLogic() { /* noop */ }, + add() { /* noop */ } }; const subscriber = new Subscriber(fakeSubscriber as any); From d383d24dfb88051357ddacfd06134dbcb09d0f70 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:53:47 +1000 Subject: [PATCH 08/20] test(internals): update for subscription changes --- spec/operators/observeOn-spec.ts | 20 +++++++++----------- spec/operators/switch-spec.ts | 7 ++++++- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/spec/operators/observeOn-spec.ts b/spec/operators/observeOn-spec.ts index ed218db90c..a0771ef08f 100644 --- a/spec/operators/observeOn-spec.ts +++ b/spec/operators/observeOn-spec.ts @@ -103,23 +103,21 @@ describe('observeOn operator', () => { .pipe(observeOn(asapScheduler)) .subscribe( x => { - const observeOnSubscriber = subscription._subscriptions[0]; - expect(observeOnSubscriber._subscriptions.length).to.equal(2); // one for the consumer, and one for the notification - expect(observeOnSubscriber._subscriptions[1].state.notification.kind) - .to.equal('N'); - expect(observeOnSubscriber._subscriptions[1].state.notification.value) - .to.equal(x); + // see #4106 - inner subscriptions are now added to destinations + // so the subscription will contain an ObserveOnSubscriber and a subscription for the scheduled action + expect(subscription._subscriptions.length).to.equal(2); + const actionSubscription = subscription._subscriptions[1]; + expect(actionSubscription.state.notification.kind).to.equal('N'); + expect(actionSubscription.state.notification.value).to.equal(x); results.push(x); }, err => done(err), () => { // now that the last nexted value is done, there should only be a complete notification scheduled // the consumer will have been unsubscribed via Subscriber#_parentSubscription - const observeOnSubscriber = subscription._subscriptions[0]; - expect(observeOnSubscriber._subscriptions.length).to.equal(1); // one for the complete notification - // only this completion notification should remain. - expect(observeOnSubscriber._subscriptions[0].state.notification.kind) - .to.equal('C'); + expect(subscription._subscriptions.length).to.equal(1); + const actionSubscription = subscription._subscriptions[0]; + expect(actionSubscription.state.notification.kind).to.equal('C'); // After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this. expect(results).to.deep.equal([1, 2, 3]); done(); diff --git a/spec/operators/switch-spec.ts b/spec/operators/switch-spec.ts index 0e0b391c7a..b6bf4c4cab 100644 --- a/spec/operators/switch-spec.ts +++ b/spec/operators/switch-spec.ts @@ -252,9 +252,14 @@ describe('switchAll', () => { [0, 1, 2, 3, 4].forEach((n) => { oStreamControl.next(n); // creates inner }); - // Expect two children of switch(): The oStream and the first inner + // Expect one child of switch(): The oStream expect( (sub as any)._subscriptions[0]._subscriptions.length + ).to.equal(1); + // Expect two children of subscribe(): The destination and the first inner + // See #4106 - inner subscriptions are now added to destinations + expect( + (sub as any)._subscriptions.length ).to.equal(2); sub.unsubscribe(); }); From b12c14c816c8336e2d563b8a4090b2070a8b1097 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:54:48 +1000 Subject: [PATCH 09/20] refactor(zip): add to destination --- src/internal/observable/zip.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/internal/observable/zip.ts b/src/internal/observable/zip.ts index cea051a3d1..ddae0fa9c3 100644 --- a/src/internal/observable/zip.ts +++ b/src/internal/observable/zip.ts @@ -4,6 +4,7 @@ import { isArray } from '../util/isArray'; import { Operator } from '../Operator'; import { ObservableInput, PartialObserver } from '../types'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; @@ -126,6 +127,8 @@ export class ZipSubscriber extends Subscriber { const iterators = this.iterators; const len = iterators.length; + this.unsubscribe(); + if (len === 0) { this.destination.complete(); return; @@ -135,7 +138,8 @@ export class ZipSubscriber extends Subscriber { for (let i = 0; i < len; i++) { let iterator: ZipBufferIterator = iterators[i]; if (iterator.stillUnsubscribed) { - this.add(iterator.subscribe(iterator, i)); + const destination = this.destination as Subscription; + destination.add(iterator.subscribe(iterator, i)); } else { this.active--; // not an observable } From 0e33a56355f110fa8251772377a34f94557dbac2 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:55:13 +1000 Subject: [PATCH 10/20] refactor(delay): add to destination --- src/internal/operators/delay.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/internal/operators/delay.ts b/src/internal/operators/delay.ts index 6ef71a4b64..3689e60f01 100644 --- a/src/internal/operators/delay.ts +++ b/src/internal/operators/delay.ts @@ -2,6 +2,7 @@ import { async } from '../scheduler/async'; import { isDate } from '../util/isDate'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { Notification } from '../Notification'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, PartialObserver, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; @@ -110,7 +111,8 @@ class DelaySubscriber extends Subscriber { private _schedule(scheduler: SchedulerLike): void { this.active = true; - this.add(scheduler.schedule>(DelaySubscriber.dispatch, this.delay, { + const destination = this.destination as Subscription; + destination.add(scheduler.schedule>(DelaySubscriber.dispatch, this.delay, { source: this, destination: this.destination, scheduler: scheduler })); } @@ -137,10 +139,12 @@ class DelaySubscriber extends Subscriber { this.errored = true; this.queue = []; this.destination.error(err); + this.unsubscribe(); } protected _complete() { this.scheduleNotification(Notification.createComplete()); + this.unsubscribe(); } } From 456baa9c6517bb5cc01fa948b52773ba45a13bb3 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:55:33 +1000 Subject: [PATCH 11/20] refactor(delayWhen): add to destination --- src/internal/operators/delayWhen.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/internal/operators/delayWhen.ts b/src/internal/operators/delayWhen.ts index e8c071b890..d22f26bad4 100644 --- a/src/internal/operators/delayWhen.ts +++ b/src/internal/operators/delayWhen.ts @@ -132,6 +132,7 @@ class DelayWhenSubscriber extends OuterSubscriber { protected _complete(): void { this.completed = true; this.tryComplete(); + this.unsubscribe(); } private removeSubscription(subscription: InnerSubscriber): T { @@ -149,7 +150,8 @@ class DelayWhenSubscriber extends OuterSubscriber { const notifierSubscription = subscribeToResult(this, delayNotifier, value); if (notifierSubscription && !notifierSubscription.closed) { - this.add(notifierSubscription); + const destination = this.destination as Subscription; + destination.add(notifierSubscription); this.delayNotifierSubscriptions.push(notifierSubscription); } } @@ -199,6 +201,7 @@ class SubscriptionDelaySubscriber extends Subscriber { } protected _complete() { + this.unsubscribe(); this.subscribeToSource(); } From c721cd4d4bd21f2f1f7fef428c38e6b212c43029 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:55:54 +1000 Subject: [PATCH 12/20] refactor(exhaustMap): add to destination --- src/internal/operators/exhaustMap.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 67a6ee8bab..37b4e113cf 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -120,7 +120,8 @@ class ExhaustMapSubscriber extends OuterSubscriber { private _innerSub(result: ObservableInput, value: T, index: number): void { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); subscribeToResult(this, result, value, index, innerSubscriber); } @@ -129,6 +130,7 @@ class ExhaustMapSubscriber extends OuterSubscriber { if (!this.hasSubscription) { this.destination.complete(); } + this.unsubscribe(); } notifyNext(outerValue: T, innerValue: R, @@ -142,7 +144,8 @@ class ExhaustMapSubscriber extends OuterSubscriber { } notifyComplete(innerSub: Subscription): void { - this.remove(innerSub); + const destination = this.destination as Subscription; + destination.remove(innerSub); this.hasSubscription = false; if (this.hasCompleted) { From 21b73b0ca17d32fb3fdefb5e8b85ccf09bf6be79 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:56:37 +1000 Subject: [PATCH 13/20] refactor(expand): add to destination --- src/internal/operators/expand.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/internal/operators/expand.ts b/src/internal/operators/expand.ts index 80c425a96a..33dde3bb38 100644 --- a/src/internal/operators/expand.ts +++ b/src/internal/operators/expand.ts @@ -133,7 +133,8 @@ export class ExpandSubscriber extends OuterSubscriber { this.subscribeToProjection(result, value, index); } else { const state: DispatchArg = { subscriber: this, result, value, index }; - this.add(this.scheduler.schedule>(ExpandSubscriber.dispatch, 0, state)); + const destination = this.destination as Subscription; + destination.add(this.scheduler.schedule>(ExpandSubscriber.dispatch, 0, state)); } } else { this.buffer.push(value); @@ -142,7 +143,8 @@ export class ExpandSubscriber extends OuterSubscriber { private subscribeToProjection(result: any, value: T, index: number): void { this.active++; - this.add(subscribeToResult(this, result, value, index)); + const destination = this.destination as Subscription; + destination.add(subscribeToResult(this, result, value, index)); } protected _complete(): void { @@ -150,6 +152,7 @@ export class ExpandSubscriber extends OuterSubscriber { if (this.hasCompleted && this.active === 0) { this.destination.complete(); } + this.unsubscribe(); } notifyNext(outerValue: T, innerValue: R, @@ -160,7 +163,8 @@ export class ExpandSubscriber extends OuterSubscriber { notifyComplete(innerSub: Subscription): void { const buffer = this.buffer; - this.remove(innerSub); + const destination = this.destination as Subscription; + destination.remove(innerSub); this.active--; if (buffer && buffer.length > 0) { this._next(buffer.shift()); From 26d070b85b5b679fbbc05039c6bd8c8e771dbd45 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:56:56 +1000 Subject: [PATCH 14/20] refactor(mergeScan): add to destination --- src/internal/operators/mergeScan.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 5c34d03a31..42be3d81c2 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -101,7 +101,8 @@ export class MergeScanSubscriber extends OuterSubscriber { private _innerSub(ish: any, value: T, index: number): void { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); subscribeToResult(this, ish, value, index, innerSubscriber); } @@ -113,6 +114,7 @@ export class MergeScanSubscriber extends OuterSubscriber { } this.destination.complete(); } + this.unsubscribe(); } notifyNext(outerValue: T, innerValue: R, @@ -126,7 +128,8 @@ export class MergeScanSubscriber extends OuterSubscriber { notifyComplete(innerSub: Subscription): void { const buffer = this.buffer; - this.remove(innerSub); + const destination = this.destination as Subscription; + destination.remove(innerSub); this.active--; if (buffer.length > 0) { this._next(buffer.shift()); From dad60bffd33eef7d7c80c9243acb3603fdbf9c90 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:57:15 +1000 Subject: [PATCH 15/20] refactor(observeOn): add to destination --- src/internal/operators/observeOn.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/internal/operators/observeOn.ts b/src/internal/operators/observeOn.ts index 3813ce5c76..d06fe3c4d4 100644 --- a/src/internal/operators/observeOn.ts +++ b/src/internal/operators/observeOn.ts @@ -1,6 +1,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { Notification } from '../Notification'; import { MonoTypeOperatorFunction, PartialObserver, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; @@ -88,7 +89,8 @@ export class ObserveOnSubscriber extends Subscriber { } private scheduleMessage(notification: Notification): void { - this.add(this.scheduler.schedule( + const destination = this.destination as Subscription; + destination.add(this.scheduler.schedule( ObserveOnSubscriber.dispatch, this.delay, new ObserveOnMessage(notification, this.destination) @@ -101,10 +103,12 @@ export class ObserveOnSubscriber extends Subscriber { protected _error(err: any): void { this.scheduleMessage(Notification.createError(err)); + this.unsubscribe(); } protected _complete(): void { this.scheduleMessage(Notification.createComplete()); + this.unsubscribe(); } } From e226a774cef72fcbfa5c105c9b121776434363f9 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:57:38 +1000 Subject: [PATCH 16/20] refactor(onErrorResumeNext): add to destination --- src/internal/operators/onErrorResumeNext.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index d3197e5698..7c37de54c9 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -2,6 +2,7 @@ import { Observable } from '../Observable'; import { from } from '../observable/from'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { isArray } from '../util/isArray'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; @@ -143,17 +144,20 @@ class OnErrorResumeNextSubscriber extends OuterSubscriber { protected _error(err: any): void { this.subscribeToNextSource(); + this.unsubscribe(); } protected _complete(): void { this.subscribeToNextSource(); + this.unsubscribe(); } private subscribeToNextSource(): void { const next = this.nextSources.shift(); if (next) { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); subscribeToResult(this, next, undefined, undefined, innerSubscriber); } else { this.destination.complete(); From 0044084c65ee2be76b1b9835713a6325865653f3 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:58:04 +1000 Subject: [PATCH 17/20] refactor(sequenceEqual): add to destination --- src/internal/operators/sequenceEqual.ts | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/internal/operators/sequenceEqual.ts b/src/internal/operators/sequenceEqual.ts index 100d9d5985..117b42bdb6 100644 --- a/src/internal/operators/sequenceEqual.ts +++ b/src/internal/operators/sequenceEqual.ts @@ -1,6 +1,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { tryCatch } from '../util/tryCatch'; import { errorObject } from '../util/errorObject'; @@ -89,7 +90,7 @@ export class SequenceEqualSubscriber extends Subscriber { private compareTo: Observable, private comparor: (a: T, b: T) => boolean) { super(destination); - this.add(compareTo.subscribe(new SequenceEqualCompareToSubscriber(destination, this))); + (this.destination as Subscription).add(compareTo.subscribe(new SequenceEqualCompareToSubscriber(destination, this))); } protected _next(value: T): void { @@ -107,6 +108,7 @@ export class SequenceEqualSubscriber extends Subscriber { } else { this._oneComplete = true; } + this.unsubscribe(); } checkValues() { @@ -143,6 +145,14 @@ export class SequenceEqualSubscriber extends Subscriber { this.checkValues(); } } + + completeB() { + if (this._oneComplete) { + this.emit(this._a.length === 0 && this._b.length === 0); + } else { + this._oneComplete = true; + } + } } class SequenceEqualCompareToSubscriber extends Subscriber { @@ -156,9 +166,11 @@ class SequenceEqualCompareToSubscriber extends Subscriber { protected _error(err: any): void { this.parent.error(err); + this.unsubscribe(); } protected _complete(): void { - this.parent._complete(); + this.parent.completeB(); + this.unsubscribe(); } } From 9918ab2710fc6097f888e30c2d402e961c82c6c1 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 07:58:31 +1000 Subject: [PATCH 18/20] refactor(switchMap): add to destination --- src/internal/operators/switchMap.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index ee83978c08..0008c379ef 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -114,7 +114,8 @@ class SwitchMapSubscriber extends OuterSubscriber { innerSubscription.unsubscribe(); } const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); this.innerSubscription = subscribeToResult(this, result, value, index, innerSubscriber); } @@ -123,6 +124,7 @@ class SwitchMapSubscriber extends OuterSubscriber { if (!innerSubscription || innerSubscription.closed) { super._complete(); } + this.unsubscribe(); } protected _unsubscribe() { @@ -130,7 +132,8 @@ class SwitchMapSubscriber extends OuterSubscriber { } notifyComplete(innerSub: Subscription): void { - this.remove(innerSub); + const destination = this.destination as Subscription; + destination.remove(innerSub); this.innerSubscription = null; if (this.isStopped) { super._complete(); From 4eb3f4423db551927166573aff6762431abcac94 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 Sep 2018 08:02:41 +1000 Subject: [PATCH 19/20] chore(Subscriber): remove _addParentTeardownLogic --- spec/Subscriber-spec.ts | 4 ++-- src/internal/Subscriber.ts | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/spec/Subscriber-spec.ts b/spec/Subscriber-spec.ts index a78959f39c..a885fffaf3 100644 --- a/spec/Subscriber-spec.ts +++ b/spec/Subscriber-spec.ts @@ -23,8 +23,8 @@ describe('Subscriber', () => { it('should accept subscribers as a destination if they meet the proper criteria', () => { const fakeSubscriber = { [rxSubscriber](this: any) { return this; }, - _addParentTeardownLogic() { /* noop */ }, - add() { /* noop */ } + add() { /* noop */ }, + syncErrorThrowable: false }; const subscriber = new Subscriber(fakeSubscriber as any); diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index 6008ffcf4f..86c824ea41 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -154,11 +154,6 @@ export class Subscriber extends Subscription implements Observer { this.unsubscribe(); } - /** @deprecated This is an internal implementation detail, do not use. */ - _addParentTeardownLogic(parentTeardownLogic: TeardownLogic) { - /*noop*/ - } - /** @deprecated This is an internal implementation detail, do not use. */ _unsubscribeAndRecycle(): Subscriber { const { _parent, _parents } = this; @@ -315,5 +310,5 @@ export class SafeSubscriber extends Subscriber { } export function isTrustedSubscriber(obj: any) { - return obj instanceof Subscriber || ('_addParentTeardownLogic' in obj && obj[rxSubscriberSymbol]); + return obj instanceof Subscriber || ('syncErrorThrowable' in obj && obj[rxSubscriberSymbol]); } From b817f539b9696bd7afd0dfab8aeb1ee0b5278c44 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 22 Sep 2018 09:41:51 +1000 Subject: [PATCH 20/20] chore(test): simplify mergeMap test Remove the mapTo and the concat to reduce the number of subscribers to make the test easier to reason with. --- spec/operators/mergeMap-spec.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index 5af05847de..cb21aff151 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { mergeMap, map, mapTo } from 'rxjs/operators'; -import { asapScheduler, concat, defer, Observable, from, of, timer } from 'rxjs'; +import { mergeMap, map } from 'rxjs/operators'; +import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; declare const type: Function; @@ -772,18 +772,18 @@ describe('mergeMap', () => { const results: (number | string)[] = []; const wrapped = new Observable(subscriber => { - const subscription = timer(0, asapScheduler).pipe(mapTo(42)).subscribe(subscriber); + const subscription = timer(0, asapScheduler).subscribe(subscriber); return () => subscription.unsubscribe(); }); wrapped.pipe( - mergeMap(value => concat(of(value), timer(0, asapScheduler).pipe(mapTo(value)))) + mergeMap(() => timer(0, asapScheduler)) ).subscribe({ next(value) { results.push(value); }, complete() { results.push('done'); } }); setTimeout(() => { - expect(results).to.deep.equal([42, 42, 'done']); + expect(results).to.deep.equal([0, 'done']); done(); }, 0); });