Skip to content

Commit

Permalink
fix(Subscription): fold ChildSubscription logic into Subscriber to pr…
Browse files Browse the repository at this point in the history
…event operators from leaking ChildSubscriptions.

The addition of ChildSubscription to fix ReactiveX#2244 accidentally introduced a different memory leak. Most operators that add and remove inner Subscriptions store the inner Subscriber instance, not the value returned by Subscription#add. When they try to remove the inner Subscription manually, nothing is removed, because the ChildSubscription wrapper instance is the one added to the subscriptions list.

Fixes ReactiveX#2355
  • Loading branch information
trxcllnt committed Feb 13, 2017
1 parent 31dfc73 commit 9567ca0
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 106 deletions.
10 changes: 5 additions & 5 deletions spec/operators/observeOn-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,21 @@ describe('Observable.prototype.observeOn', () => {
.observeOn(Rx.Scheduler.asap)
.subscribe(
x => {
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
const observeOnSubscriber = subscription._subscriptions[0];
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
expect(observeOnSubscriber._subscriptions[1].state.notification.kind)
.to.equal('N');
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value)
expect(observeOnSubscriber._subscriptions[1].state.notification.value)
.to.equal(x);
results.push(x);
},
err => done(err),
() => {
// now that the last nexted value is done, there should only be a complete notification scheduled
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
const observeOnSubscriber = subscription._subscriptions[0];
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification
// only this completion notification should remain.
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
expect(observeOnSubscriber._subscriptions[1].state.notification.kind)
.to.equal('C');
// After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this.
expect(results).to.deep.equal([1, 2, 3]);
Expand Down
42 changes: 41 additions & 1 deletion spec/operators/switch-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,44 @@ describe('Observable.prototype.switch', () => {

expect(completed).to.be.true;
});
});

it('should not leak when child completes before each switch (prevent memory leaks #2355)', () => {
let iStream: Rx.Subject<number>;
const oStreamControl = new Rx.Subject<number>();
const oStream = oStreamControl.map(() => {
return (iStream = new Rx.Subject());
});
const switcher = oStream.switch();
const result = [];
let sub = switcher.subscribe((x: number) => result.push(x));

[0, 1, 2, 3, 4].forEach((n) => {
oStreamControl.next(n); // creates inner
iStream.complete();
});
// Expect one child of switch(): The oStream
expect(
(<any>sub)._subscriptions[0]._subscriptions.length
).to.equal(1);
sub.unsubscribe();
});

it('should not leak if we switch before child completes (prevent memory leaks #2355)', () => {
const oStreamControl = new Rx.Subject<number>();
const oStream = oStreamControl.map(() => {
return (new Rx.Subject());
});
const switcher = oStream.switch();
const result = [];
let sub = switcher.subscribe((x: number) => result.push(x));

[0, 1, 2, 3, 4].forEach((n) => {
oStreamControl.next(n); // creates inner
});
// Expect two children of switch(): The oStream and the first inner
expect(
(<any>sub)._subscriptions[0]._subscriptions.length
).to.equal(2);
sub.unsubscribe();
});
});
44 changes: 28 additions & 16 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this.destination.complete();
this.unsubscribe();
}

protected _unsubscribeAndRecycle(): Subscriber<T> {
const { _parent, _parents } = this;
this._parent = null;
this._parents = null;
this.unsubscribe();
this.closed = false;
this.isStopped = false;
this._parent = _parent;
this._parents = _parents;
return this;
}
}

/**
Expand All @@ -155,7 +167,7 @@ class SafeSubscriber<T> extends Subscriber<T> {

private _context: any;

constructor(private _parent: Subscriber<T>,
constructor(private _parentSubscriber: Subscriber<T>,
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
Expand Down Expand Up @@ -185,46 +197,46 @@ class SafeSubscriber<T> extends Subscriber<T> {

next(value?: T): void {
if (!this.isStopped && this._next) {
const { _parent } = this;
if (!_parent.syncErrorThrowable) {
const { _parentSubscriber } = this;
if (!_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._next, value);
} else if (this.__tryOrSetError(_parent, this._next, value)) {
} else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
this.unsubscribe();
}
}
}

error(err?: any): void {
if (!this.isStopped) {
const { _parent } = this;
const { _parentSubscriber } = this;
if (this._error) {
if (!_parent.syncErrorThrowable) {
if (!_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._error, err);
this.unsubscribe();
} else {
this.__tryOrSetError(_parent, this._error, err);
this.__tryOrSetError(_parentSubscriber, this._error, err);
this.unsubscribe();
}
} else if (!_parent.syncErrorThrowable) {
} else if (!_parentSubscriber.syncErrorThrowable) {
this.unsubscribe();
throw err;
} else {
_parent.syncErrorValue = err;
_parent.syncErrorThrown = true;
_parentSubscriber.syncErrorValue = err;
_parentSubscriber.syncErrorThrown = true;
this.unsubscribe();
}
}
}

complete(): void {
if (!this.isStopped) {
const { _parent } = this;
const { _parentSubscriber } = this;
if (this._complete) {
if (!_parent.syncErrorThrowable) {
if (!_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._complete);
this.unsubscribe();
} else {
this.__tryOrSetError(_parent, this._complete);
this.__tryOrSetError(_parentSubscriber, this._complete);
this.unsubscribe();
}
} else {
Expand Down Expand Up @@ -254,9 +266,9 @@ class SafeSubscriber<T> extends Subscriber<T> {
}

protected _unsubscribe(): void {
const { _parent } = this;
const { _parentSubscriber } = this;
this._context = null;
this._parent = null;
_parent.unsubscribe();
this._parentSubscriber = null;
_parentSubscriber.unsubscribe();
}
}
94 changes: 56 additions & 38 deletions src/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ export class Subscription implements ISubscription {
*/
public closed: boolean = false;

private _subscriptions: ISubscription[];
protected _parent: Subscription = null;
protected _parents: Subscription[] = null;
private _subscriptions: ISubscription[] = null;

/**
* @param {function(): void} [unsubscribe] A function describing how to
Expand All @@ -66,11 +68,26 @@ export class Subscription implements ISubscription {
return;
}

this.closed = true;

const { _unsubscribe, _subscriptions } = (<any> this);
let { _parent, _parents, _unsubscribe, _subscriptions } = (<any> this);

(<any> this)._subscriptions = null;
this.closed = true;
this._parent = null;
this._parents = null;
// null out _subscriptions first so any child subscriptions that attempt
// to remove themselves from this subscription will noop
this._subscriptions = null;

let index = -1;
let len = _parents ? _parents.length : 0;

// if this._parent is null, then so is this._parents, and we
// don't have to remove ourselves from any parent subscriptions.
while (_parent) {
_parent.remove(this);
// if this._parents is null or index >= len,
// then _parent is set to null, and the loop exits
_parent = ++index < len && _parents[index] || null;
}

if (isFunction(_unsubscribe)) {
let trial = tryCatch(_unsubscribe).call(this);
Expand All @@ -85,8 +102,8 @@ export class Subscription implements ISubscription {

if (isArray(_subscriptions)) {

let index = -1;
const len = _subscriptions.length;
index = -1;
len = _subscriptions.length;

while (++index < len) {
const sub = _subscriptions[index];
Expand Down Expand Up @@ -138,27 +155,33 @@ export class Subscription implements ISubscription {
return this;
}

let sub = (<Subscription> teardown);
let subscription = (<Subscription> teardown);

switch (typeof teardown) {
case 'function':
sub = new Subscription(<(() => void) > teardown);
subscription = new Subscription(<(() => void) > teardown);
case 'object':
if (sub.closed || typeof sub.unsubscribe !== 'function') {
return sub;
if (subscription.closed || typeof subscription.unsubscribe !== 'function') {
return subscription;
} else if (this.closed) {
sub.unsubscribe();
return sub;
subscription.unsubscribe();
return subscription;
} else if (typeof subscription._addParent !== 'function' /* quack quack */) {
const tmp = subscription;
subscription = new Subscription();
subscription._subscriptions = [tmp];
}
break;
default:
throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.');
}

const childSub = new ChildSubscription(sub, this);
this._subscriptions = this._subscriptions || [];
this._subscriptions.push(childSub);
return childSub;
const subscriptions = this._subscriptions || (this._subscriptions = []);

subscriptions.push(subscription);
subscription._addParent(this);

return subscription;
}

/**
Expand All @@ -168,37 +191,32 @@ export class Subscription implements ISubscription {
* @return {void}
*/
remove(subscription: Subscription): void {

// HACK: This might be redundant because of the logic in `add()`
if (subscription == null || (
subscription === this) || (
subscription === Subscription.EMPTY)) {
return;
}

const subscriptions = (<any> this)._subscriptions;

const subscriptions = this._subscriptions;
if (subscriptions) {
const subscriptionIndex = subscriptions.indexOf(subscription);
if (subscriptionIndex !== -1) {
subscriptions.splice(subscriptionIndex, 1);
}
}
}
}

export class ChildSubscription extends Subscription {
constructor(private _innerSub: ISubscription, private _parent: Subscription) {
super();
}

_unsubscribe() {
const { _innerSub, _parent } = this;
_parent.remove(this);
_innerSub.unsubscribe();
private _addParent(parent: Subscription) {
let { _parent, _parents } = this;
if (!_parent || _parent === parent) {
// If we don't have a parent, or the new parent is the same as the
// current parent, then set this._parent to the new parent.
this._parent = parent;
} else if (!_parents) {
// If there's already one parent, but not multiple, allocate an Array to
// store the rest of the parent Subscriptions.
this._parents = [parent];
} else if (_parents.indexOf(parent) === -1) {
// Only add the new parent to the _parents list if it's not already there.
_parents.push(parent);
}
}
}

function flattenUnsubscriptionErrors(errors: any[]) {
return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []);
}
}
4 changes: 1 addition & 3 deletions src/operator/catch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, R> {
super.error(err2);
return;
}
this.unsubscribe();
this.closed = false;
this.isStopped = false;
this._unsubscribeAndRecycle();
this.add(subscribeToResult(this, result));
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/operator/observeOn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Operator } from '../Operator';
import { PartialObserver } from '../Observer';
import { Subscriber } from '../Subscriber';
import { Notification } from '../Notification';
import { TeardownLogic, Subscription } from '../Subscription';
import { TeardownLogic } from '../Subscription';
import { Action } from '../scheduler/Action';

/**
Expand Down Expand Up @@ -36,11 +36,9 @@ export class ObserveOnOperator<T> implements Operator<T, T> {
*/
export class ObserveOnSubscriber<T> extends Subscriber<T> {
static dispatch(this: Action<ObserveOnMessage>, arg: ObserveOnMessage) {
const { notification, destination, subscription } = arg;
const { notification, destination } = arg;
notification.observe(destination);
if (subscription) {
subscription.unsubscribe();
}
this.unsubscribe();
}

constructor(destination: Subscriber<T>,
Expand All @@ -50,10 +48,11 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

private scheduleMessage(notification: Notification<any>): void {
const message = new ObserveOnMessage(notification, this.destination);
message.subscription = this.add(
this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message)
);
this.add(this.scheduler.schedule(
ObserveOnSubscriber.dispatch,
this.delay,
new ObserveOnMessage(notification, this.destination)
));
}

protected _next(value: T): void {
Expand All @@ -70,8 +69,6 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

export class ObserveOnMessage {
public subscription: Subscription;

constructor(public notification: Notification<any>,
public destination: PartialObserver<any>) {
}
Expand Down
5 changes: 1 addition & 4 deletions src/operator/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ class RepeatSubscriber<T> extends Subscriber<T> {
} else if (count > -1) {
this.count = count - 1;
}
this.unsubscribe();
this.isStopped = false;
this.closed = false;
source.subscribe(this);
source.subscribe(this._unsubscribeAndRecycle());
}
}
}
Loading

0 comments on commit 9567ca0

Please sign in to comment.