diff --git a/spec/operators/observeOn-spec.ts b/spec/operators/observeOn-spec.ts index 7ad8352656..431973b917 100644 --- a/spec/operators/observeOn-spec.ts +++ b/spec/operators/observeOn-spec.ts @@ -1,4 +1,6 @@ import * as Rx from '../../dist/cjs/Rx'; +import { expect } from 'chai'; + declare const {hot, asDiagram, expectObservable, expectSubscriptions}; declare const rxTestScheduler: Rx.TestScheduler; @@ -77,4 +79,46 @@ describe('Observable.prototype.observeOn', () => { expectObservable(result, unsub).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(sub); }); + + it('should clean up subscriptions created by async scheduling (prevent memory leaks #2244)', (done) => { + //HACK: Deep introspection to make sure we're cleaning up notifications in scheduling. + // as the architecture changes, this test may become brittle. + const results = []; + const subscription: any = new Observable(observer => { + let i = 1; + const id = setInterval(() => { + if (i > 3) { + observer.complete(); + } else { + observer.next(i++); + } + }, 0); + + return () => clearInterval(id); + }) + .observeOn(Rx.Scheduler.asap) + .subscribe( + x => { + const observeOnSubscriber = subscription._subscriptions[0]._innerSub; + expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification + expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind) + .to.equal('N'); + expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value) + .to.equal(x); + results.push(x); + }, + err => done(err), + () => { + // now that the last nexted value is done, there should only be a complete notification scheduled + const observeOnSubscriber = subscription._subscriptions[0]._innerSub; + expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification + // only this completion notification should remain. + expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind) + .to.equal('C'); + // After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this. + expect(results).to.deep.equal([1, 2, 3]); + done(); + } + ); + }); }); diff --git a/src/operator/observeOn.ts b/src/operator/observeOn.ts index 88d070ed85..5694cd4a19 100644 --- a/src/operator/observeOn.ts +++ b/src/operator/observeOn.ts @@ -4,7 +4,8 @@ import { Operator } from '../Operator'; import { PartialObserver } from '../Observer'; import { Subscriber } from '../Subscriber'; import { Notification } from '../Notification'; -import { TeardownLogic } from '../Subscription'; +import { TeardownLogic, Subscription } from '../Subscription'; +import { Action } from '../scheduler/Action'; /** * @see {@link Notification} @@ -34,9 +35,12 @@ export class ObserveOnOperator implements Operator { * @extends {Ignored} */ export class ObserveOnSubscriber extends Subscriber { - static dispatch(arg: ObserveOnMessage) { - const { notification, destination } = arg; + static dispatch(this: Action, arg: ObserveOnMessage) { + const { notification, destination, subscription } = arg; notification.observe(destination); + if (subscription) { + subscription.unsubscribe(); + } } constructor(destination: Subscriber, @@ -46,10 +50,11 @@ export class ObserveOnSubscriber extends Subscriber { } private scheduleMessage(notification: Notification): void { - this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, - this.delay, - new ObserveOnMessage(notification, this.destination))); - } + const message = new ObserveOnMessage(notification, this.destination); + message.subscription = this.add( + this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message) + ); + } protected _next(value: T): void { this.scheduleMessage(Notification.createNext(value)); @@ -65,6 +70,8 @@ export class ObserveOnSubscriber extends Subscriber { } export class ObserveOnMessage { + public subscription: Subscription; + constructor(public notification: Notification, public destination: PartialObserver) { } diff --git a/src/scheduler/VirtualTimeScheduler.ts b/src/scheduler/VirtualTimeScheduler.ts index 771c5ae72c..6cd31bc19f 100644 --- a/src/scheduler/VirtualTimeScheduler.ts +++ b/src/scheduler/VirtualTimeScheduler.ts @@ -58,6 +58,10 @@ export class VirtualAction extends AsyncAction { return super.schedule(state, delay); } + // If an action is rescheduled, we save allocations by mutating its state, + // pushing it to the end of the scheduler queue, and recycling the action. + // But since the VirtualTimeScheduler is used for testing, VirtualActions + // must be immutable so they can be inspected later. const action = new VirtualAction(this.scheduler, this.work); this.add(action); return action.schedule(state, delay);