Skip to content

Commit

Permalink
Merge pull request #2248 from blesh/observeOn-leak
Browse files Browse the repository at this point in the history
Observe on leak
  • Loading branch information
benlesh authored Jan 4, 2017
2 parents 6922b16 + 9664a38 commit f92eea7
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 27 deletions.
14 changes: 9 additions & 5 deletions spec/Subscription-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,19 @@ describe('Subscription', () => {
expect(isCalled).to.equal(true);
});

it('Should returns the passed one if passed a unsubscribed AnonymousSubscription', () => {
const sub = new Subscription();
it('Should wrap the AnonymousSubscription and return a subscription that unsubscribes and removes it when unsubbed', () => {
const sub: any = new Subscription();
let called = false;
const arg = {
isUnsubscribed: true,
unsubscribe: () => undefined,
unsubscribe: () => called = true,
};
const ret = sub.add(arg);

expect(ret).to.equal(arg);
expect(called).to.equal(false);
expect(sub._subscriptions.length).to.equal(1);
ret.unsubscribe();
expect(called).to.equal(true);
expect(sub._subscriptions.length).to.equal(0);
});

it('Should returns the passed one if passed a AnonymousSubscription having not function `unsubscribe` member', () => {
Expand Down
44 changes: 44 additions & 0 deletions spec/operators/observeOn-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import * as Rx from '../../dist/cjs/Rx';
import { expect } from 'chai';

declare const {hot, asDiagram, expectObservable, expectSubscriptions};

declare const rxTestScheduler: Rx.TestScheduler;
Expand Down Expand Up @@ -77,4 +79,46 @@ describe('Observable.prototype.observeOn', () => {
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});

it('should clean up subscriptions created by async scheduling (prevent memory leaks #2244)', (done) => {
//HACK: Deep introspection to make sure we're cleaning up notifications in scheduling.
// as the architecture changes, this test may become brittle.
const results = [];
const subscription: any = new Observable(observer => {
let i = 1;
const id = setInterval(() => {
if (i > 3) {
observer.complete();
} else {
observer.next(i++);
}
}, 0);

return () => clearInterval(id);
})
.observeOn(Rx.Scheduler.asap)
.subscribe(
x => {
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
.to.equal('N');
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value)
.to.equal(x);
results.push(x);
},
err => done(err),
() => {
// now that the last nexted value is done, there should only be a complete notification scheduled
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification
// only this completion notification should remain.
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
.to.equal('C');
// After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this.
expect(results).to.deep.equal([1, 2, 3]);
done();
}
);
});
});
35 changes: 29 additions & 6 deletions src/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export class Subscription implements ISubscription {
*/
public closed: boolean = false;

private _subscriptions: ISubscription[];

/**
* @param {function(): void} [unsubscribe] A function describing how to
* perform the disposal of resources when the `unsubscribe` method is called.
Expand Down Expand Up @@ -74,7 +76,10 @@ export class Subscription implements ISubscription {
let trial = tryCatch(_unsubscribe).call(this);
if (trial === errorObject) {
hasErrors = true;
(errors = errors || []).push(errorObject.e);
errors = errors || (
errorObject.e instanceof UnsubscriptionError ?
flattenUnsubscriptionErrors(errorObject.e.errors) : [errorObject.e]
);
}
}

Expand All @@ -92,7 +97,7 @@ export class Subscription implements ISubscription {
errors = errors || [];
let err = errorObject.e;
if (err instanceof UnsubscriptionError) {
errors = errors.concat(err.errors);
errors = errors.concat(flattenUnsubscriptionErrors(err.errors));
} else {
errors.push(err);
}
Expand Down Expand Up @@ -140,18 +145,20 @@ export class Subscription implements ISubscription {
sub = new Subscription(<(() => void) > teardown);
case 'object':
if (sub.closed || typeof sub.unsubscribe !== 'function') {
break;
return sub;
} else if (this.closed) {
sub.unsubscribe();
} else {
((<any> this)._subscriptions || ((<any> this)._subscriptions = [])).push(sub);
return sub;
}
break;
default:
throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.');
}

return sub;
const childSub = new ChildSubscription(sub, this);
this._subscriptions = this._subscriptions || [];
this._subscriptions.push(childSub);
return childSub;
}

/**
Expand Down Expand Up @@ -179,3 +186,19 @@ export class Subscription implements ISubscription {
}
}
}

export class ChildSubscription extends Subscription {
constructor(private _innerSub: ISubscription, private _parent: Subscription) {
super();
}

_unsubscribe() {
const { _innerSub, _parent } = this;
_parent.remove(this);
_innerSub.unsubscribe();
}
}

function flattenUnsubscriptionErrors(errors: any[]) {
return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []);
}
21 changes: 14 additions & 7 deletions src/operator/observeOn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { Operator } from '../Operator';
import { PartialObserver } from '../Observer';
import { Subscriber } from '../Subscriber';
import { Notification } from '../Notification';
import { TeardownLogic } from '../Subscription';
import { TeardownLogic, Subscription } from '../Subscription';
import { Action } from '../scheduler/Action';

/**
* @see {@link Notification}
Expand Down Expand Up @@ -34,9 +35,12 @@ export class ObserveOnOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
export class ObserveOnSubscriber<T> extends Subscriber<T> {
static dispatch(arg: ObserveOnMessage) {
const { notification, destination } = arg;
static dispatch(this: Action<ObserveOnMessage>, arg: ObserveOnMessage) {
const { notification, destination, subscription } = arg;
notification.observe(destination);
if (subscription) {
subscription.unsubscribe();
}
}

constructor(destination: Subscriber<T>,
Expand All @@ -46,10 +50,11 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

private scheduleMessage(notification: Notification<any>): void {
this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch,
this.delay,
new ObserveOnMessage(notification, this.destination)));
}
const message = new ObserveOnMessage(notification, this.destination);
message.subscription = this.add(
this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message)
);
}

protected _next(value: T): void {
this.scheduleMessage(Notification.createNext(value));
Expand All @@ -65,6 +70,8 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

export class ObserveOnMessage {
public subscription: Subscription;

constructor(public notification: Notification<any>,
public destination: PartialObserver<any>) {
}
Expand Down
20 changes: 11 additions & 9 deletions src/scheduler/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ export class VirtualAction<T> extends AsyncAction<T> {
}

public schedule(state?: T, delay: number = 0): Subscription {
return !this.id ?
super.schedule(state, delay) : (
// If an action is rescheduled, we save allocations by mutating its state,
// pushing it to the end of the scheduler queue, and recycling the action.
// But since the VirtualTimeScheduler is used for testing, VirtualActions
// must be immutable so they can be inspected later.
<VirtualAction<T>> this.add(
new VirtualAction<T>(this.scheduler, this.work))
).schedule(state, delay);
if (!this.id) {
return super.schedule(state, delay);
}

// If an action is rescheduled, we save allocations by mutating its state,
// pushing it to the end of the scheduler queue, and recycling the action.
// But since the VirtualTimeScheduler is used for testing, VirtualActions
// must be immutable so they can be inspected later.
const action = new VirtualAction(this.scheduler, this.work);
this.add(action);
return action.schedule(state, delay);
}

protected requestAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay: number = 0): any {
Expand Down

0 comments on commit f92eea7

Please sign in to comment.