Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel scheduled timeout work, if no longer needed #2135

Merged
merged 37 commits into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9441def
fix(timeout): Cancels scheduled timeout, if no longer needed
jayphelps Nov 16, 2016
9763c03
fix(timeoutWith): Cancels scheduled timeout, if no longer needed
jayphelps Nov 16, 2016
6d4937f
build(npm-scripts): update debug_mocha npm script for node 6
trxcllnt Nov 16, 2016
a219c6c
fix(VirtualAction): Block rescheduled VirtualActions from executing t…
trxcllnt Nov 16, 2016
0de6ef5
fix(timeout): Update timeout and timeoutWith to recycle their schedul…
trxcllnt Nov 20, 2016
b4d0605
Merge branch 'timeout' of https://github.com/jayphelps/rxjs into jayp…
trxcllnt Feb 7, 2017
8513fd2
Merge pull request #3 from trxcllnt/jayphelps-timeout
jayphelps Feb 7, 2017
2e2fb9d
Merge branch 'timeout' of https://github.com/jayphelps/rxjs into jayp…
trxcllnt Feb 7, 2017
9c0a8c9
test(timeout): Add types to timeout and timeoutWith specs
trxcllnt Feb 7, 2017
b31438d
Merge branch 'timeout' into jayphelps-timeout
trxcllnt Feb 7, 2017
9e16f48
Merge pull request #4 from trxcllnt/jayphelps-timeout
trxcllnt Feb 7, 2017
e14c7b8
Merge branch 'master' into timeout
trxcllnt Feb 12, 2017
cfaa876
Fix merge conflicts
trxcllnt Feb 15, 2017
44ab561
Merge branch 'master' into jayphelps-timeout
trxcllnt Feb 15, 2017
bb8c64e
Fix timeoutWith to work with new Subscriber leak fix.
trxcllnt Feb 15, 2017
e29fcdb
Merge pull request #5 from trxcllnt/jayphelps-timeout
trxcllnt Feb 15, 2017
119e0d4
fix(timeout-spec): fix merge conflicts
trxcllnt Feb 15, 2017
fc86e5e
fix(Subscription): fold ChildSubscription logic into Subscriber to pr…
trxcllnt Feb 13, 2017
5a2901d
chore(publish): 5.1.1
benlesh Feb 13, 2017
ad27711
Ignore coverage
timruffles Feb 13, 2017
24de734
fix(subscribeToResult): accept array-like as result
mpodlasin Feb 12, 2017
aef558e
feat(AjaxObservable) : support 'PATCH' request type
herflis Feb 10, 2017
39f4009
chore(ajax.patch): Adds test for ajax.patch
jayphelps Feb 13, 2017
f38a27e
fix(merge): return Observable when called with single lowerCaseO
mpodlasin Feb 12, 2017
db72756
feat(webSocket): Add binaryType to config object
mpodlasin Feb 10, 2017
ad7dd69
fix(forkJoin): add type signature for single observable with selector
mpodlasin Feb 10, 2017
2b66fc4
chore(danger): update dangerfile to validate commit message
kwonoj Feb 9, 2017
0bbfdf2
fix(bindNodeCallback): emit undefined when callback has no success ar…
mpodlasin Feb 5, 2017
dc568da
fix(bindCallback): emit undefined when callback is without arguments
mpodlasin Feb 5, 2017
d59d733
chore(*): correctly scope disabled `max-line-length` tslint rule
gkalpak Jan 23, 2017
74dd235
fix(mergeAll): introduce variant support <T, R> for mergeMap
kwonoj Feb 14, 2017
b1bb0ce
feat(windowTime): maxWindowSize parameter in windowTime operator
mpodlasin Dec 12, 2016
42a886a
docs(ObservableInput): add ObservableInput and SubscribableOrPromise …
mpodlasin Feb 6, 2017
790a469
fix(timeoutWith): update timeoutWith to work with new Subscriber leak…
trxcllnt Feb 15, 2017
0f2522d
Merge pull request #6 from trxcllnt/jayphelps-timeout
trxcllnt Feb 15, 2017
94bee5d
Merge branch 'master' into jayphelps-timeout
trxcllnt Feb 21, 2017
c9994b0
Merge pull request #7 from trxcllnt/jayphelps-timeout
trxcllnt Feb 21, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
"prepublish": "shx rm -rf ./typings && typings install && npm run build_all",
"publish_docs": "./publish_docs.sh",
"test_mocha": "mocha --opts spec/support/default.opts spec-js",
"debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js",
"debug_mocha": "node --inspect --debug-brk ./node_modules/.bin/_mocha --opts spec/support/debug.opts spec-js",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems unrelated although I agree with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trxcllnt I believe this is you, cool for me to remove?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as it's in a follow up PR that's fine with me. I didn't want to clutter up the PR list with internal config changes that only affect the devs working on the project, but I leave it up to your discretion

"test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html",
"test": "npm-run-all clean_spec build_spec test_mocha clean_spec",
"tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js",
Expand Down
28 changes: 26 additions & 2 deletions spec/operators/timeout-spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import * as Rx from '../../dist/cjs/Rx';
import { expect } from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const { asDiagram };
declare const rxTestScheduler: Rx.TestScheduler;
declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;

/** @test {timeout} */
Expand Down Expand Up @@ -121,4 +121,28 @@ describe('Observable.prototype.timeout', () => {
expectObservable(result).toBe(expected, values, defaultTimeoutError);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('--a--b--c---d--e--|');
const e1subs = '^ ! ';
const expected = '--a--b--c-- ';
const unsub = ' ! ';

const result = e1
.lift({
call: (timeoutSubscriber, source) => {
const { action } = <any> timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source.subscribe(timeoutSubscriber);
}
})
.timeout(50, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
29 changes: 28 additions & 1 deletion spec/operators/timeoutWith-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const { asDiagram };
declare const rxTestScheduler: Rx.TestScheduler;
declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;

/** @test {timeoutWith} */
Expand Down Expand Up @@ -266,4 +266,31 @@ describe('Observable.prototype.timeoutWith', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('---a---b-----c----|');
const e1subs = '^ ! ';
const e2 = cold( '-x---y| ');
const e2subs = ' ^ ! ';
const expected = '---a---b----x-- ';
const unsub = ' ! ';

const result = e1
.lift({
call: (timeoutSubscriber, source) => {
const { action } = <any> timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source.subscribe(timeoutSubscriber);
}
})
.timeoutWith(40, e2, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
13 changes: 12 additions & 1 deletion spec/schedulers/VirtualTimeScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => {
v.flush();
expect(count).to.equal(3);
});
});

it('should not execute virtual actions that have been rescheduled before flush', () => {
const v = new VirtualTimeScheduler();
let messages = [];
let action: VirtualAction<string> = <VirtualAction<string>> v.schedule(function(state: string) {
messages.push(state);
}, 10, 'first message');
action = <VirtualAction<string>> action.schedule('second message' , 10);
v.flush();
expect(messages).to.deep.equal(['second message']);
});
});
56 changes: 23 additions & 33 deletions src/operator/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Action } from '../scheduler/Action';
import { async } from '../scheduler/async';
import { isDate } from '../util/isDate';
import { Operator } from '../Operator';
Expand Down Expand Up @@ -42,15 +43,8 @@ class TimeoutOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutSubscriber<T> extends Subscriber<T> {
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

private action: Action<TimeoutSubscriber<T>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
Expand All @@ -61,40 +55,36 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.notifyTimeout();
}
private static dispatchTimeout<T>(subscriber: TimeoutSubscriber<T>): void {
subscriber.error(subscriber.errorInstance);
}

private scheduleTimeout(): void {
let currentIndex = this.index;
this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
this.index++;
this._previousIndex = currentIndex;
const { action } = this;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutSubscriber<T>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutSubscriber<T>>> this.scheduler.schedule(
TimeoutSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T): void {
this.destination.next(value);

if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any): void {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete(): void {
this.destination.complete();
this._hasCompleted = true;
}

notifyTimeout(): void {
this.error(this.errorInstance);
protected _unsubscribe() {
this.action = null;
this.scheduler = null;
this.errorInstance = null;
}
}
72 changes: 29 additions & 43 deletions src/operator/timeoutWith.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Action } from '../scheduler/Action';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { IScheduler } from '../Scheduler';
import { async } from '../scheduler/async';
import { Subscription, TeardownLogic } from '../Subscription';
import { TeardownLogic } from '../Subscription';
import { Observable, ObservableInput } from '../Observable';
import { isDate } from '../util/isDate';
import { OuterSubscriber } from '../OuterSubscriber';
Expand Down Expand Up @@ -49,65 +50,50 @@ class TimeoutWithOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
private timeoutSubscription: Subscription = undefined;
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

constructor(public destination: Subscriber<T>,
private action: Action<TimeoutWithSubscriber<T, R>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
private waitFor: number,
private withObservable: ObservableInput<any>,
private scheduler: IScheduler) {
super();
destination.add(this);
super(destination);
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.handleTimeout();
}
private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
const { withObservable } = subscriber;
(<any> subscriber)._unsubscribeAndRecycle();
subscriber.add(subscribeToResult(subscriber, withObservable));
}

private scheduleTimeout(): void {
let currentIndex = this.index;
const timeoutState = { subscriber: this, index: currentIndex };
this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState);
this.index++;
this._previousIndex = currentIndex;
const { action } = this;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule(
TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T) {
this.destination.next(value);
protected _next(value: T): void {
if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any) {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete() {
this.destination.complete();
this._hasCompleted = true;
}

handleTimeout(): void {
if (!this.closed) {
const withObservable = this.withObservable;
this.unsubscribe();
this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable));
}
protected _unsubscribe() {
this.action = null;
this.scheduler = null;
this.withObservable = null;
}
}
10 changes: 9 additions & 1 deletion src/scheduler/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export class VirtualTimeScheduler extends AsyncScheduler {
*/
export class VirtualAction<T> extends AsyncAction<T> {

protected active: boolean = true;

constructor(protected scheduler: VirtualTimeScheduler,
protected work: (this: VirtualAction<T>, state?: T) => void,
protected index: number = scheduler.index += 1) {
Expand All @@ -57,7 +59,7 @@ export class VirtualAction<T> extends AsyncAction<T> {
if (!this.id) {
return super.schedule(state, delay);
}

this.active = false;
// If an action is rescheduled, we save allocations by mutating its state,
// pushing it to the end of the scheduler queue, and recycling the action.
// But since the VirtualTimeScheduler is used for testing, VirtualActions
Expand All @@ -79,6 +81,12 @@ export class VirtualAction<T> extends AsyncAction<T> {
return undefined;
}

protected _execute(state: T, delay: number): any {
if (this.active === true) {
return super._execute(state, delay);
}
}

public static sortActions<T>(a: VirtualAction<T>, b: VirtualAction<T>) {
if (a.delay === b.delay) {
if (a.index === b.index) {
Expand Down