Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel scheduled timeout work, if no longer needed #2135

Merged
merged 37 commits into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9441def
fix(timeout): Cancels scheduled timeout, if no longer needed
jayphelps Nov 16, 2016
9763c03
fix(timeoutWith): Cancels scheduled timeout, if no longer needed
jayphelps Nov 16, 2016
6d4937f
build(npm-scripts): update debug_mocha npm script for node 6
trxcllnt Nov 16, 2016
a219c6c
fix(VirtualAction): Block rescheduled VirtualActions from executing t…
trxcllnt Nov 16, 2016
0de6ef5
fix(timeout): Update timeout and timeoutWith to recycle their schedul…
trxcllnt Nov 20, 2016
b4d0605
Merge branch 'timeout' of https://github.com/jayphelps/rxjs into jayp…
trxcllnt Feb 7, 2017
8513fd2
Merge pull request #3 from trxcllnt/jayphelps-timeout
jayphelps Feb 7, 2017
2e2fb9d
Merge branch 'timeout' of https://github.com/jayphelps/rxjs into jayp…
trxcllnt Feb 7, 2017
9c0a8c9
test(timeout): Add types to timeout and timeoutWith specs
trxcllnt Feb 7, 2017
b31438d
Merge branch 'timeout' into jayphelps-timeout
trxcllnt Feb 7, 2017
9e16f48
Merge pull request #4 from trxcllnt/jayphelps-timeout
trxcllnt Feb 7, 2017
e14c7b8
Merge branch 'master' into timeout
trxcllnt Feb 12, 2017
cfaa876
Fix merge conflicts
trxcllnt Feb 15, 2017
44ab561
Merge branch 'master' into jayphelps-timeout
trxcllnt Feb 15, 2017
bb8c64e
Fix timeoutWith to work with new Subscriber leak fix.
trxcllnt Feb 15, 2017
e29fcdb
Merge pull request #5 from trxcllnt/jayphelps-timeout
trxcllnt Feb 15, 2017
119e0d4
fix(timeout-spec): fix merge conflicts
trxcllnt Feb 15, 2017
fc86e5e
fix(Subscription): fold ChildSubscription logic into Subscriber to pr…
trxcllnt Feb 13, 2017
5a2901d
chore(publish): 5.1.1
benlesh Feb 13, 2017
ad27711
Ignore coverage
timruffles Feb 13, 2017
24de734
fix(subscribeToResult): accept array-like as result
mpodlasin Feb 12, 2017
aef558e
feat(AjaxObservable) : support 'PATCH' request type
herflis Feb 10, 2017
39f4009
chore(ajax.patch): Adds test for ajax.patch
jayphelps Feb 13, 2017
f38a27e
fix(merge): return Observable when called with single lowerCaseO
mpodlasin Feb 12, 2017
db72756
feat(webSocket): Add binaryType to config object
mpodlasin Feb 10, 2017
ad7dd69
fix(forkJoin): add type signature for single observable with selector
mpodlasin Feb 10, 2017
2b66fc4
chore(danger): update dangerfile to validate commit message
kwonoj Feb 9, 2017
0bbfdf2
fix(bindNodeCallback): emit undefined when callback has no success ar…
mpodlasin Feb 5, 2017
dc568da
fix(bindCallback): emit undefined when callback is without arguments
mpodlasin Feb 5, 2017
d59d733
chore(*): correctly scope disabled `max-line-length` tslint rule
gkalpak Jan 23, 2017
74dd235
fix(mergeAll): introduce variant support <T, R> for mergeMap
kwonoj Feb 14, 2017
b1bb0ce
feat(windowTime): maxWindowSize parameter in windowTime operator
mpodlasin Dec 12, 2016
42a886a
docs(ObservableInput): add ObservableInput and SubscribableOrPromise …
mpodlasin Feb 6, 2017
790a469
fix(timeoutWith): update timeoutWith to work with new Subscriber leak…
trxcllnt Feb 15, 2017
0f2522d
Merge pull request #6 from trxcllnt/jayphelps-timeout
trxcllnt Feb 15, 2017
94bee5d
Merge branch 'master' into jayphelps-timeout
trxcllnt Feb 21, 2017
c9994b0
Merge pull request #7 from trxcllnt/jayphelps-timeout
trxcllnt Feb 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(Subscription): fold ChildSubscription logic into Subscriber to pr…
…event operators from leaking ChildSubscriptions. (#2360)

The addition of ChildSubscription to fix #2244 accidentally introduced a different memory leak. Most operators that add and remove inner Subscriptions store the inner Subscriber instance, not the value returned by Subscription#add. When they try to remove the inner Subscription manually, nothing is removed, because the ChildSubscription wrapper instance is the one added to the subscriptions list.

Fixes #2355
  • Loading branch information
trxcllnt committed Feb 15, 2017
commit fc86e5e076f95dd7b9c914d2c3a3542dce1c6dd4
10 changes: 5 additions & 5 deletions spec/operators/observeOn-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,21 @@ describe('Observable.prototype.observeOn', () => {
.observeOn(Rx.Scheduler.asap)
.subscribe(
x => {
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
const observeOnSubscriber = subscription._subscriptions[0];
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
expect(observeOnSubscriber._subscriptions[1].state.notification.kind)
.to.equal('N');
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value)
expect(observeOnSubscriber._subscriptions[1].state.notification.value)
.to.equal(x);
results.push(x);
},
err => done(err),
() => {
// now that the last nexted value is done, there should only be a complete notification scheduled
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
const observeOnSubscriber = subscription._subscriptions[0];
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification
// only this completion notification should remain.
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
expect(observeOnSubscriber._subscriptions[1].state.notification.kind)
.to.equal('C');
// After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this.
expect(results).to.deep.equal([1, 2, 3]);
Expand Down
42 changes: 41 additions & 1 deletion spec/operators/switch-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,44 @@ describe('Observable.prototype.switch', () => {

expect(completed).to.be.true;
});
});

it('should not leak when child completes before each switch (prevent memory leaks #2355)', () => {
let iStream: Rx.Subject<number>;
const oStreamControl = new Rx.Subject<number>();
const oStream = oStreamControl.map(() => {
return (iStream = new Rx.Subject());
});
const switcher = oStream.switch();
const result = [];
let sub = switcher.subscribe((x: number) => result.push(x));

[0, 1, 2, 3, 4].forEach((n) => {
oStreamControl.next(n); // creates inner
iStream.complete();
});
// Expect one child of switch(): The oStream
expect(
(<any>sub)._subscriptions[0]._subscriptions.length
).to.equal(1);
sub.unsubscribe();
});

it('should not leak if we switch before child completes (prevent memory leaks #2355)', () => {
const oStreamControl = new Rx.Subject<number>();
const oStream = oStreamControl.map(() => {
return (new Rx.Subject());
});
const switcher = oStream.switch();
const result = [];
let sub = switcher.subscribe((x: number) => result.push(x));

[0, 1, 2, 3, 4].forEach((n) => {
oStreamControl.next(n); // creates inner
});
// Expect two children of switch(): The oStream and the first inner
expect(
(<any>sub)._subscriptions[0]._subscriptions.length
).to.equal(2);
sub.unsubscribe();
});
});
44 changes: 28 additions & 16 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this.destination.complete();
this.unsubscribe();
}

protected _unsubscribeAndRecycle(): Subscriber<T> {
const { _parent, _parents } = this;
this._parent = null;
this._parents = null;
this.unsubscribe();
this.closed = false;
this.isStopped = false;
this._parent = _parent;
this._parents = _parents;
return this;
}
}

/**
Expand All @@ -155,7 +167,7 @@ class SafeSubscriber<T> extends Subscriber<T> {

private _context: any;

constructor(private _parent: Subscriber<T>,
constructor(private _parentSubscriber: Subscriber<T>,
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
Expand Down Expand Up @@ -185,46 +197,46 @@ class SafeSubscriber<T> extends Subscriber<T> {

next(value?: T): void {
if (!this.isStopped && this._next) {
const { _parent } = this;
if (!_parent.syncErrorThrowable) {
const { _parentSubscriber } = this;
if (!_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._next, value);
} else if (this.__tryOrSetError(_parent, this._next, value)) {
} else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
this.unsubscribe();
}
}
}

error(err?: any): void {
if (!this.isStopped) {
const { _parent } = this;
const { _parentSubscriber } = this;
if (this._error) {
if (!_parent.syncErrorThrowable) {
if (!_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._error, err);
this.unsubscribe();
} else {
this.__tryOrSetError(_parent, this._error, err);
this.__tryOrSetError(_parentSubscriber, this._error, err);
this.unsubscribe();
}
} else if (!_parent.syncErrorThrowable) {
} else if (!_parentSubscriber.syncErrorThrowable) {
this.unsubscribe();
throw err;
} else {
_parent.syncErrorValue = err;
_parent.syncErrorThrown = true;
_parentSubscriber.syncErrorValue = err;
_parentSubscriber.syncErrorThrown = true;
this.unsubscribe();
}
}
}

complete(): void {
if (!this.isStopped) {
const { _parent } = this;
const { _parentSubscriber } = this;
if (this._complete) {
if (!_parent.syncErrorThrowable) {
if (!_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._complete);
this.unsubscribe();
} else {
this.__tryOrSetError(_parent, this._complete);
this.__tryOrSetError(_parentSubscriber, this._complete);
this.unsubscribe();
}
} else {
Expand Down Expand Up @@ -254,9 +266,9 @@ class SafeSubscriber<T> extends Subscriber<T> {
}

protected _unsubscribe(): void {
const { _parent } = this;
const { _parentSubscriber } = this;
this._context = null;
this._parent = null;
_parent.unsubscribe();
this._parentSubscriber = null;
_parentSubscriber.unsubscribe();
}
}
94 changes: 56 additions & 38 deletions src/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ export class Subscription implements ISubscription {
*/
public closed: boolean = false;

private _subscriptions: ISubscription[];
protected _parent: Subscription = null;
protected _parents: Subscription[] = null;
private _subscriptions: ISubscription[] = null;

/**
* @param {function(): void} [unsubscribe] A function describing how to
Expand All @@ -66,11 +68,26 @@ export class Subscription implements ISubscription {
return;
}

this.closed = true;

const { _unsubscribe, _subscriptions } = (<any> this);
let { _parent, _parents, _unsubscribe, _subscriptions } = (<any> this);

(<any> this)._subscriptions = null;
this.closed = true;
this._parent = null;
this._parents = null;
// null out _subscriptions first so any child subscriptions that attempt
// to remove themselves from this subscription will noop
this._subscriptions = null;

let index = -1;
let len = _parents ? _parents.length : 0;

// if this._parent is null, then so is this._parents, and we
// don't have to remove ourselves from any parent subscriptions.
while (_parent) {
_parent.remove(this);
// if this._parents is null or index >= len,
// then _parent is set to null, and the loop exits
_parent = ++index < len && _parents[index] || null;
}

if (isFunction(_unsubscribe)) {
let trial = tryCatch(_unsubscribe).call(this);
Expand All @@ -85,8 +102,8 @@ export class Subscription implements ISubscription {

if (isArray(_subscriptions)) {

let index = -1;
const len = _subscriptions.length;
index = -1;
len = _subscriptions.length;

while (++index < len) {
const sub = _subscriptions[index];
Expand Down Expand Up @@ -138,27 +155,33 @@ export class Subscription implements ISubscription {
return this;
}

let sub = (<Subscription> teardown);
let subscription = (<Subscription> teardown);

switch (typeof teardown) {
case 'function':
sub = new Subscription(<(() => void) > teardown);
subscription = new Subscription(<(() => void) > teardown);
case 'object':
if (sub.closed || typeof sub.unsubscribe !== 'function') {
return sub;
if (subscription.closed || typeof subscription.unsubscribe !== 'function') {
return subscription;
} else if (this.closed) {
sub.unsubscribe();
return sub;
subscription.unsubscribe();
return subscription;
} else if (typeof subscription._addParent !== 'function' /* quack quack */) {
const tmp = subscription;
subscription = new Subscription();
subscription._subscriptions = [tmp];
}
break;
default:
throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.');
}

const childSub = new ChildSubscription(sub, this);
this._subscriptions = this._subscriptions || [];
this._subscriptions.push(childSub);
return childSub;
const subscriptions = this._subscriptions || (this._subscriptions = []);

subscriptions.push(subscription);
subscription._addParent(this);

return subscription;
}

/**
Expand All @@ -168,37 +191,32 @@ export class Subscription implements ISubscription {
* @return {void}
*/
remove(subscription: Subscription): void {

// HACK: This might be redundant because of the logic in `add()`
if (subscription == null || (
subscription === this) || (
subscription === Subscription.EMPTY)) {
return;
}

const subscriptions = (<any> this)._subscriptions;

const subscriptions = this._subscriptions;
if (subscriptions) {
const subscriptionIndex = subscriptions.indexOf(subscription);
if (subscriptionIndex !== -1) {
subscriptions.splice(subscriptionIndex, 1);
}
}
}
}

export class ChildSubscription extends Subscription {
constructor(private _innerSub: ISubscription, private _parent: Subscription) {
super();
}

_unsubscribe() {
const { _innerSub, _parent } = this;
_parent.remove(this);
_innerSub.unsubscribe();
private _addParent(parent: Subscription) {
let { _parent, _parents } = this;
if (!_parent || _parent === parent) {
// If we don't have a parent, or the new parent is the same as the
// current parent, then set this._parent to the new parent.
this._parent = parent;
} else if (!_parents) {
// If there's already one parent, but not multiple, allocate an Array to
// store the rest of the parent Subscriptions.
this._parents = [parent];
} else if (_parents.indexOf(parent) === -1) {
// Only add the new parent to the _parents list if it's not already there.
_parents.push(parent);
}
}
}

function flattenUnsubscriptionErrors(errors: any[]) {
return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []);
}
}
4 changes: 1 addition & 3 deletions src/operator/catch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, R> {
super.error(err2);
return;
}
this.unsubscribe();
this.closed = false;
this.isStopped = false;
this._unsubscribeAndRecycle();
this.add(subscribeToResult(this, result));
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/operator/observeOn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Operator } from '../Operator';
import { PartialObserver } from '../Observer';
import { Subscriber } from '../Subscriber';
import { Notification } from '../Notification';
import { TeardownLogic, Subscription } from '../Subscription';
import { TeardownLogic } from '../Subscription';
import { Action } from '../scheduler/Action';

/**
Expand Down Expand Up @@ -36,11 +36,9 @@ export class ObserveOnOperator<T> implements Operator<T, T> {
*/
export class ObserveOnSubscriber<T> extends Subscriber<T> {
static dispatch(this: Action<ObserveOnMessage>, arg: ObserveOnMessage) {
const { notification, destination, subscription } = arg;
const { notification, destination } = arg;
notification.observe(destination);
if (subscription) {
subscription.unsubscribe();
}
this.unsubscribe();
}

constructor(destination: Subscriber<T>,
Expand All @@ -50,10 +48,11 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

private scheduleMessage(notification: Notification<any>): void {
const message = new ObserveOnMessage(notification, this.destination);
message.subscription = this.add(
this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message)
);
this.add(this.scheduler.schedule(
ObserveOnSubscriber.dispatch,
this.delay,
new ObserveOnMessage(notification, this.destination)
));
}

protected _next(value: T): void {
Expand All @@ -70,8 +69,6 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

export class ObserveOnMessage {
public subscription: Subscription;

constructor(public notification: Notification<any>,
public destination: PartialObserver<any>) {
}
Expand Down
5 changes: 1 addition & 4 deletions src/operator/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ class RepeatSubscriber<T> extends Subscriber<T> {
} else if (count > -1) {
this.count = count - 1;
}
this.unsubscribe();
this.isStopped = false;
this.closed = false;
source.subscribe(this);
source.subscribe(this._unsubscribeAndRecycle());
}
}
}
Loading