Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observe on leak #2248

Merged
merged 2 commits into from
Jan 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions spec/Subscription-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,19 @@ describe('Subscription', () => {
expect(isCalled).to.equal(true);
});

it('Should returns the passed one if passed a unsubscribed AnonymousSubscription', () => {
const sub = new Subscription();
it('Should wrap the AnonymousSubscription and return a subscription that unsubscribes and removes it when unsubbed', () => {
const sub: any = new Subscription();
let called = false;
const arg = {
isUnsubscribed: true,
unsubscribe: () => undefined,
unsubscribe: () => called = true,
};
const ret = sub.add(arg);

expect(ret).to.equal(arg);
expect(called).to.equal(false);
expect(sub._subscriptions.length).to.equal(1);
ret.unsubscribe();
expect(called).to.equal(true);
expect(sub._subscriptions.length).to.equal(0);
});

it('Should returns the passed one if passed a AnonymousSubscription having not function `unsubscribe` member', () => {
Expand Down
44 changes: 44 additions & 0 deletions spec/operators/observeOn-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import * as Rx from '../../dist/cjs/Rx';
import { expect } from 'chai';

declare const {hot, asDiagram, expectObservable, expectSubscriptions};

declare const rxTestScheduler: Rx.TestScheduler;
Expand Down Expand Up @@ -77,4 +79,46 @@ describe('Observable.prototype.observeOn', () => {
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});

it('should clean up subscriptions created by async scheduling (prevent memory leaks #2244)', (done) => {
//HACK: Deep introspection to make sure we're cleaning up notifications in scheduling.
// as the architecture changes, this test may become brittle.
const results = [];
const subscription: any = new Observable(observer => {
let i = 1;
const id = setInterval(() => {
if (i > 3) {
observer.complete();
} else {
observer.next(i++);
}
}, 0);

return () => clearInterval(id);
})
.observeOn(Rx.Scheduler.asap)
.subscribe(
x => {
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
.to.equal('N');
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value)
.to.equal(x);
results.push(x);
},
err => done(err),
() => {
// now that the last nexted value is done, there should only be a complete notification scheduled
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification
// only this completion notification should remain.
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
.to.equal('C');
// After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this.
expect(results).to.deep.equal([1, 2, 3]);
done();
}
);
});
});
35 changes: 29 additions & 6 deletions src/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export class Subscription implements ISubscription {
*/
public closed: boolean = false;

private _subscriptions: ISubscription[];

/**
* @param {function(): void} [unsubscribe] A function describing how to
* perform the disposal of resources when the `unsubscribe` method is called.
Expand Down Expand Up @@ -74,7 +76,10 @@ export class Subscription implements ISubscription {
let trial = tryCatch(_unsubscribe).call(this);
if (trial === errorObject) {
hasErrors = true;
(errors = errors || []).push(errorObject.e);
errors = errors || (
errorObject.e instanceof UnsubscriptionError ?
flattenUnsubscriptionErrors(errorObject.e.errors) : [errorObject.e]
);
}
}

Expand All @@ -92,7 +97,7 @@ export class Subscription implements ISubscription {
errors = errors || [];
let err = errorObject.e;
if (err instanceof UnsubscriptionError) {
errors = errors.concat(err.errors);
errors = errors.concat(flattenUnsubscriptionErrors(err.errors));
} else {
errors.push(err);
}
Expand Down Expand Up @@ -140,18 +145,20 @@ export class Subscription implements ISubscription {
sub = new Subscription(<(() => void) > teardown);
case 'object':
if (sub.closed || typeof sub.unsubscribe !== 'function') {
break;
return sub;
} else if (this.closed) {
sub.unsubscribe();
} else {
((<any> this)._subscriptions || ((<any> this)._subscriptions = [])).push(sub);
return sub;
}
break;
default:
throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.');
}

return sub;
const childSub = new ChildSubscription(sub, this);
this._subscriptions = this._subscriptions || [];
this._subscriptions.push(childSub);
return childSub;
}

/**
Expand Down Expand Up @@ -179,3 +186,19 @@ export class Subscription implements ISubscription {
}
}
}

export class ChildSubscription extends Subscription {
constructor(private _innerSub: ISubscription, private _parent: Subscription) {
super();
}

_unsubscribe() {
const { _innerSub, _parent } = this;
_parent.remove(this);
_innerSub.unsubscribe();
}
}

function flattenUnsubscriptionErrors(errors: any[]) {
return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []);
}
21 changes: 14 additions & 7 deletions src/operator/observeOn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { Operator } from '../Operator';
import { PartialObserver } from '../Observer';
import { Subscriber } from '../Subscriber';
import { Notification } from '../Notification';
import { TeardownLogic } from '../Subscription';
import { TeardownLogic, Subscription } from '../Subscription';
import { Action } from '../scheduler/Action';

/**
* @see {@link Notification}
Expand Down Expand Up @@ -34,9 +35,12 @@ export class ObserveOnOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
export class ObserveOnSubscriber<T> extends Subscriber<T> {
static dispatch(arg: ObserveOnMessage) {
const { notification, destination } = arg;
static dispatch(this: Action<ObserveOnMessage>, arg: ObserveOnMessage) {
const { notification, destination, subscription } = arg;
notification.observe(destination);
if (subscription) {
subscription.unsubscribe();
}
}

constructor(destination: Subscriber<T>,
Expand All @@ -46,10 +50,11 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

private scheduleMessage(notification: Notification<any>): void {
this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch,
this.delay,
new ObserveOnMessage(notification, this.destination)));
}
const message = new ObserveOnMessage(notification, this.destination);
message.subscription = this.add(
this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message)
);
}

protected _next(value: T): void {
this.scheduleMessage(Notification.createNext(value));
Expand All @@ -65,6 +70,8 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

export class ObserveOnMessage {
public subscription: Subscription;

constructor(public notification: Notification<any>,
public destination: PartialObserver<any>) {
}
Expand Down
20 changes: 11 additions & 9 deletions src/scheduler/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ export class VirtualAction<T> extends AsyncAction<T> {
}

public schedule(state?: T, delay: number = 0): Subscription {
return !this.id ?
super.schedule(state, delay) : (
// If an action is rescheduled, we save allocations by mutating its state,
// pushing it to the end of the scheduler queue, and recycling the action.
// But since the VirtualTimeScheduler is used for testing, VirtualActions
// must be immutable so they can be inspected later.
<VirtualAction<T>> this.add(
new VirtualAction<T>(this.scheduler, this.work))
).schedule(state, delay);
if (!this.id) {
return super.schedule(state, delay);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Blesh can we keep this comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// If an action is rescheduled, we save allocations by mutating its state,
// pushing it to the end of the scheduler queue, and recycling the action.
// But since the VirtualTimeScheduler is used for testing, VirtualActions
// must be immutable so they can be inspected later.
const action = new VirtualAction(this.scheduler, this.work);
this.add(action);
return action.schedule(state, delay);
}

protected requestAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay: number = 0): any {
Expand Down