Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel scheduled timeout work, if no longer needed #2135

Merged
merged 37 commits into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9441def
fix(timeout): Cancels scheduled timeout, if no longer needed
jayphelps Nov 16, 2016
9763c03
fix(timeoutWith): Cancels scheduled timeout, if no longer needed
jayphelps Nov 16, 2016
6d4937f
build(npm-scripts): update debug_mocha npm script for node 6
trxcllnt Nov 16, 2016
a219c6c
fix(VirtualAction): Block rescheduled VirtualActions from executing t…
trxcllnt Nov 16, 2016
0de6ef5
fix(timeout): Update timeout and timeoutWith to recycle their schedul…
trxcllnt Nov 20, 2016
b4d0605
Merge branch 'timeout' of https://github.com/jayphelps/rxjs into jayp…
trxcllnt Feb 7, 2017
8513fd2
Merge pull request #3 from trxcllnt/jayphelps-timeout
jayphelps Feb 7, 2017
2e2fb9d
Merge branch 'timeout' of https://github.com/jayphelps/rxjs into jayp…
trxcllnt Feb 7, 2017
9c0a8c9
test(timeout): Add types to timeout and timeoutWith specs
trxcllnt Feb 7, 2017
b31438d
Merge branch 'timeout' into jayphelps-timeout
trxcllnt Feb 7, 2017
9e16f48
Merge pull request #4 from trxcllnt/jayphelps-timeout
trxcllnt Feb 7, 2017
e14c7b8
Merge branch 'master' into timeout
trxcllnt Feb 12, 2017
cfaa876
Fix merge conflicts
trxcllnt Feb 15, 2017
44ab561
Merge branch 'master' into jayphelps-timeout
trxcllnt Feb 15, 2017
bb8c64e
Fix timeoutWith to work with new Subscriber leak fix.
trxcllnt Feb 15, 2017
e29fcdb
Merge pull request #5 from trxcllnt/jayphelps-timeout
trxcllnt Feb 15, 2017
119e0d4
fix(timeout-spec): fix merge conflicts
trxcllnt Feb 15, 2017
fc86e5e
fix(Subscription): fold ChildSubscription logic into Subscriber to pr…
trxcllnt Feb 13, 2017
5a2901d
chore(publish): 5.1.1
benlesh Feb 13, 2017
ad27711
Ignore coverage
timruffles Feb 13, 2017
24de734
fix(subscribeToResult): accept array-like as result
mpodlasin Feb 12, 2017
aef558e
feat(AjaxObservable) : support 'PATCH' request type
herflis Feb 10, 2017
39f4009
chore(ajax.patch): Adds test for ajax.patch
jayphelps Feb 13, 2017
f38a27e
fix(merge): return Observable when called with single lowerCaseO
mpodlasin Feb 12, 2017
db72756
feat(webSocket): Add binaryType to config object
mpodlasin Feb 10, 2017
ad7dd69
fix(forkJoin): add type signature for single observable with selector
mpodlasin Feb 10, 2017
2b66fc4
chore(danger): update dangerfile to validate commit message
kwonoj Feb 9, 2017
0bbfdf2
fix(bindNodeCallback): emit undefined when callback has no success ar…
mpodlasin Feb 5, 2017
dc568da
fix(bindCallback): emit undefined when callback is without arguments
mpodlasin Feb 5, 2017
d59d733
chore(*): correctly scope disabled `max-line-length` tslint rule
gkalpak Jan 23, 2017
74dd235
fix(mergeAll): introduce variant support <T, R> for mergeMap
kwonoj Feb 14, 2017
b1bb0ce
feat(windowTime): maxWindowSize parameter in windowTime operator
mpodlasin Dec 12, 2016
42a886a
docs(ObservableInput): add ObservableInput and SubscribableOrPromise …
mpodlasin Feb 6, 2017
790a469
fix(timeoutWith): update timeoutWith to work with new Subscriber leak…
trxcllnt Feb 15, 2017
0f2522d
Merge pull request #6 from trxcllnt/jayphelps-timeout
trxcllnt Feb 15, 2017
94bee5d
Merge branch 'master' into jayphelps-timeout
trxcllnt Feb 21, 2017
c9994b0
Merge pull request #7 from trxcllnt/jayphelps-timeout
trxcllnt Feb 21, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
"prepublish": "shx rm -rf ./typings && typings install && npm run build_all",
"publish_docs": "./publish_docs.sh",
"test_mocha": "mocha --opts spec/support/default.opts spec-js",
"debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js",
"debug_mocha": "node --inspect --debug-brk ./node_modules/.bin/_mocha --opts spec/support/debug.opts spec-js",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems unrelated although I agree with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trxcllnt I believe this is you, cool for me to remove?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as it's in a follow up PR that's fine with me. I didn't want to clutter up the PR list with internal config changes that only affect the devs working on the project, but I leave it up to your discretion

"test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html",
"test": "npm-run-all clean_spec build_spec test_mocha clean_spec",
"tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js",
Expand Down
23 changes: 23 additions & 0 deletions spec/operators/timeout-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,27 @@ describe('Observable.prototype.timeout', () => {
expectObservable(result).toBe(expected, values, value);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('--a--b--c---d--e--|');
const e1subs = '^ ! ';
const expected = '--a--b--c-- ';
const unsub = ' ! ';

const result = e1
.lift(function(source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this test is very straightforward. Perhaps it could be refactored into using the let operator, and returning a more basic observable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pair on this? I couldn't figure out how to test this at all, so @trxcllnt came up with this solution and it's still not clear to me how we could do it better. 💃

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Blesh let gives you a handle on the source Observable, but lift gives you a handle to each subscriber. In this case we want to know whether the subscriber did something (canceled its internal scheduled action subscription), so we use lift. textbook use-case for lift.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trxcllnt ugh.. this is a case where we're abusing the fact that Operator has a call method, and we're therefor passing a function to lift here? This is really convoluted, it took me a while to figure that out. If anything this test reminded me why Operator probably shouldn't have a call method.

I think we need to find a different way to test this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayphelps I think the way to test this, and other things like it, would be to add functionality to the TestScheduler so that it tracks the Actions it's created and those Actions can be examined to see at what tick they were cancelled. For now, we can leave these tests in here, but they'll need comments explaining what they're doing and why it works.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we need comments explaining why lift is magically accepting a function here and not an Operator instance.

We could simply pass an object with a call method.

.lift({
  call(source) { ... }
})

Copy link
Member

@trxcllnt trxcllnt Dec 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Blesh this is exactly why Operator has a call method. In an ideal world an operator would just be a function, but because JS sucks, we've had to make them classes. See RxJava, where Operators are functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ben is OK with this for now, but wants some inline comments about why we're doing

const timeoutSubscriber = this;
const { action } = timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source._subscribe(timeoutSubscriber);
})
.timeout(50, null, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
26 changes: 26 additions & 0 deletions spec/operators/timeoutWith-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,30 @@ describe('Observable.prototype.timeoutWith', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('---a---b-----c----|');
const e1subs = '^ ! ';
const e2 = cold( '-x---y| ');
const e2subs = ' ^ ! ';
const expected = '---a---b----x-- ';
const unsub = ' ! ';

const result = e1
.lift(function(source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

const timeoutSubscriber = this;
const { action } = timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source._subscribe(timeoutSubscriber);
})
.timeoutWith(40, e2, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
13 changes: 12 additions & 1 deletion spec/schedulers/VirtualTimeScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => {
v.flush();
expect(count).to.equal(3);
});
});

it('should not execute virtual actions that have been rescheduled before flush', () => {
const v = new VirtualTimeScheduler();
let messages = [];
let action: VirtualAction<string> = <VirtualAction<string>> v.schedule(function(state: string) {
messages.push(state);
}, 10, 'first message');
action = <VirtualAction<string>> action.schedule('second message' , 10);
v.flush();
expect(messages).to.deep.equal(['second message']);
});
});
56 changes: 23 additions & 33 deletions src/operator/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Action } from '../scheduler/Action';
import { async } from '../scheduler/async';
import { isDate } from '../util/isDate';
import { Operator } from '../Operator';
Expand Down Expand Up @@ -44,15 +45,8 @@ class TimeoutOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutSubscriber<T> extends Subscriber<T> {
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

private action: Action<TimeoutSubscriber<T>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
Expand All @@ -63,40 +57,36 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.notifyTimeout();
}
private static dispatchTimeout<T>(subscriber: TimeoutSubscriber<T>): void {
subscriber.error(subscriber.errorToSend);
}

private scheduleTimeout(): void {
let currentIndex = this.index;
this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
this.index++;
this._previousIndex = currentIndex;
const { action } = this;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutSubscriber<T>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutSubscriber<T>>> this.scheduler.schedule(
TimeoutSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T): void {
this.destination.next(value);

if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any): void {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete(): void {
this.destination.complete();
this._hasCompleted = true;
}

notifyTimeout(): void {
this.error(this.errorToSend);
protected _unsubscribe() {
this.action = null;
this.scheduler = null;
this.errorToSend = null;
}
}
74 changes: 31 additions & 43 deletions src/operator/timeoutWith.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Action } from '../scheduler/Action';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Scheduler } from '../Scheduler';
import { async } from '../scheduler/async';
import { Subscription, TeardownLogic } from '../Subscription';
import { TeardownLogic } from '../Subscription';
import { Observable, ObservableInput } from '../Observable';
import { isDate } from '../util/isDate';
import { OuterSubscriber } from '../OuterSubscriber';
Expand Down Expand Up @@ -49,65 +50,52 @@ class TimeoutWithOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
private timeoutSubscription: Subscription = undefined;
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

constructor(public destination: Subscriber<T>,
private action: Action<TimeoutWithSubscriber<T, R>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
private waitFor: number,
private withObservable: ObservableInput<any>,
private scheduler: Scheduler) {
super();
destination.add(this);
super(destination);
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.handleTimeout();
}
private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
const { withObservable } = subscriber;
subscriber.unsubscribe();
subscriber.closed = false;
subscriber.isStopped = false;
subscriber.add(subscribeToResult(subscriber, withObservable));
}

private scheduleTimeout(): void {
let currentIndex = this.index;
const timeoutState = { subscriber: this, index: currentIndex };
this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState);
this.index++;
this._previousIndex = currentIndex;
const { action } = this;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule(
TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T) {
this.destination.next(value);
protected _next(value: T): void {
if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any) {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete() {
this.destination.complete();
this._hasCompleted = true;
}

handleTimeout(): void {
if (!this.closed) {
const withObservable = this.withObservable;
this.unsubscribe();
this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable));
}
protected _unsubscribe() {
this.action = null;
this.scheduler = null;
this.withObservable = null;
}
}
15 changes: 13 additions & 2 deletions src/scheduler/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export class VirtualTimeScheduler extends AsyncScheduler {
*/
export class VirtualAction<T> extends AsyncAction<T> {

protected active: boolean = true;

constructor(protected scheduler: VirtualTimeScheduler,
protected work: (this: VirtualAction<T>, state?: T) => void,
protected index: number = scheduler.index += 1) {
Expand All @@ -54,8 +56,11 @@ export class VirtualAction<T> extends AsyncAction<T> {
}

public schedule(state?: T, delay: number = 0): Subscription {
return !this.id ?
super.schedule(state, delay) : (
if (!this.id) {
return super.schedule(state, delay);
}
this.active = false;
return (
// If an action is rescheduled, we save allocations by mutating its state,
// pushing it to the end of the scheduler queue, and recycling the action.
// But since the VirtualTimeScheduler is used for testing, VirtualActions
Expand All @@ -77,6 +82,12 @@ export class VirtualAction<T> extends AsyncAction<T> {
return undefined;
}

protected _execute(state: T, delay: number): any {
if (this.active === true) {
return super._execute(state, delay);
}
}

public static sortActions<T>(a: VirtualAction<T>, b: VirtualAction<T>) {
if (a.delay === b.delay) {
if (a.index === b.index) {
Expand Down