diff --git a/spec/Subscription-spec.ts b/spec/Subscription-spec.ts index 8eb9438dee..c92a3d4aa8 100644 --- a/spec/Subscription-spec.ts +++ b/spec/Subscription-spec.ts @@ -112,15 +112,19 @@ describe('Subscription', () => { expect(isCalled).to.equal(true); }); - it('Should returns the passed one if passed a unsubscribed AnonymousSubscription', () => { - const sub = new Subscription(); + it('Should wrap the AnonymousSubscription and return a subscription that unsubscribes and removes it when unsubbed', () => { + const sub: any = new Subscription(); + let called = false; const arg = { - isUnsubscribed: true, - unsubscribe: () => undefined, + unsubscribe: () => called = true, }; const ret = sub.add(arg); - expect(ret).to.equal(arg); + expect(called).to.equal(false); + expect(sub._subscriptions.length).to.equal(1); + ret.unsubscribe(); + expect(called).to.equal(true); + expect(sub._subscriptions.length).to.equal(0); }); it('Should returns the passed one if passed a AnonymousSubscription having not function `unsubscribe` member', () => { diff --git a/spec/operators/observeOn-spec.ts b/spec/operators/observeOn-spec.ts index 7ad8352656..431973b917 100644 --- a/spec/operators/observeOn-spec.ts +++ b/spec/operators/observeOn-spec.ts @@ -1,4 +1,6 @@ import * as Rx from '../../dist/cjs/Rx'; +import { expect } from 'chai'; + declare const {hot, asDiagram, expectObservable, expectSubscriptions}; declare const rxTestScheduler: Rx.TestScheduler; @@ -77,4 +79,46 @@ describe('Observable.prototype.observeOn', () => { expectObservable(result, unsub).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(sub); }); + + it('should clean up subscriptions created by async scheduling (prevent memory leaks #2244)', (done) => { + //HACK: Deep introspection to make sure we're cleaning up notifications in scheduling. + // as the architecture changes, this test may become brittle. + const results = []; + const subscription: any = new Observable(observer => { + let i = 1; + const id = setInterval(() => { + if (i > 3) { + observer.complete(); + } else { + observer.next(i++); + } + }, 0); + + return () => clearInterval(id); + }) + .observeOn(Rx.Scheduler.asap) + .subscribe( + x => { + const observeOnSubscriber = subscription._subscriptions[0]._innerSub; + expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification + expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind) + .to.equal('N'); + expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value) + .to.equal(x); + results.push(x); + }, + err => done(err), + () => { + // now that the last nexted value is done, there should only be a complete notification scheduled + const observeOnSubscriber = subscription._subscriptions[0]._innerSub; + expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification + // only this completion notification should remain. + expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind) + .to.equal('C'); + // After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this. + expect(results).to.deep.equal([1, 2, 3]); + done(); + } + ); + }); }); diff --git a/src/Subscription.ts b/src/Subscription.ts index f348bfb283..3fcf43c5a7 100644 --- a/src/Subscription.ts +++ b/src/Subscription.ts @@ -40,6 +40,8 @@ export class Subscription implements ISubscription { */ public closed: boolean = false; + private _subscriptions: ISubscription[]; + /** * @param {function(): void} [unsubscribe] A function describing how to * perform the disposal of resources when the `unsubscribe` method is called. @@ -74,7 +76,10 @@ export class Subscription implements ISubscription { let trial = tryCatch(_unsubscribe).call(this); if (trial === errorObject) { hasErrors = true; - (errors = errors || []).push(errorObject.e); + errors = errors || ( + errorObject.e instanceof UnsubscriptionError ? + flattenUnsubscriptionErrors(errorObject.e.errors) : [errorObject.e] + ); } } @@ -92,7 +97,7 @@ export class Subscription implements ISubscription { errors = errors || []; let err = errorObject.e; if (err instanceof UnsubscriptionError) { - errors = errors.concat(err.errors); + errors = errors.concat(flattenUnsubscriptionErrors(err.errors)); } else { errors.push(err); } @@ -140,18 +145,20 @@ export class Subscription implements ISubscription { sub = new Subscription(<(() => void) > teardown); case 'object': if (sub.closed || typeof sub.unsubscribe !== 'function') { - break; + return sub; } else if (this.closed) { sub.unsubscribe(); - } else { - (( this)._subscriptions || (( this)._subscriptions = [])).push(sub); + return sub; } break; default: throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.'); } - return sub; + const childSub = new ChildSubscription(sub, this); + this._subscriptions = this._subscriptions || []; + this._subscriptions.push(childSub); + return childSub; } /** @@ -179,3 +186,19 @@ export class Subscription implements ISubscription { } } } + +export class ChildSubscription extends Subscription { + constructor(private _innerSub: ISubscription, private _parent: Subscription) { + super(); + } + + _unsubscribe() { + const { _innerSub, _parent } = this; + _parent.remove(this); + _innerSub.unsubscribe(); + } +} + +function flattenUnsubscriptionErrors(errors: any[]) { + return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []); +} \ No newline at end of file diff --git a/src/operator/observeOn.ts b/src/operator/observeOn.ts index 88d070ed85..5694cd4a19 100644 --- a/src/operator/observeOn.ts +++ b/src/operator/observeOn.ts @@ -4,7 +4,8 @@ import { Operator } from '../Operator'; import { PartialObserver } from '../Observer'; import { Subscriber } from '../Subscriber'; import { Notification } from '../Notification'; -import { TeardownLogic } from '../Subscription'; +import { TeardownLogic, Subscription } from '../Subscription'; +import { Action } from '../scheduler/Action'; /** * @see {@link Notification} @@ -34,9 +35,12 @@ export class ObserveOnOperator implements Operator { * @extends {Ignored} */ export class ObserveOnSubscriber extends Subscriber { - static dispatch(arg: ObserveOnMessage) { - const { notification, destination } = arg; + static dispatch(this: Action, arg: ObserveOnMessage) { + const { notification, destination, subscription } = arg; notification.observe(destination); + if (subscription) { + subscription.unsubscribe(); + } } constructor(destination: Subscriber, @@ -46,10 +50,11 @@ export class ObserveOnSubscriber extends Subscriber { } private scheduleMessage(notification: Notification): void { - this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, - this.delay, - new ObserveOnMessage(notification, this.destination))); - } + const message = new ObserveOnMessage(notification, this.destination); + message.subscription = this.add( + this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message) + ); + } protected _next(value: T): void { this.scheduleMessage(Notification.createNext(value)); @@ -65,6 +70,8 @@ export class ObserveOnSubscriber extends Subscriber { } export class ObserveOnMessage { + public subscription: Subscription; + constructor(public notification: Notification, public destination: PartialObserver) { } diff --git a/src/scheduler/VirtualTimeScheduler.ts b/src/scheduler/VirtualTimeScheduler.ts index 21279b9696..6cd31bc19f 100644 --- a/src/scheduler/VirtualTimeScheduler.ts +++ b/src/scheduler/VirtualTimeScheduler.ts @@ -54,15 +54,17 @@ export class VirtualAction extends AsyncAction { } public schedule(state?: T, delay: number = 0): Subscription { - return !this.id ? - super.schedule(state, delay) : ( - // If an action is rescheduled, we save allocations by mutating its state, - // pushing it to the end of the scheduler queue, and recycling the action. - // But since the VirtualTimeScheduler is used for testing, VirtualActions - // must be immutable so they can be inspected later. - > this.add( - new VirtualAction(this.scheduler, this.work)) - ).schedule(state, delay); + if (!this.id) { + return super.schedule(state, delay); + } + + // If an action is rescheduled, we save allocations by mutating its state, + // pushing it to the end of the scheduler queue, and recycling the action. + // But since the VirtualTimeScheduler is used for testing, VirtualActions + // must be immutable so they can be inspected later. + const action = new VirtualAction(this.scheduler, this.work); + this.add(action); + return action.schedule(state, delay); } protected requestAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay: number = 0): any {