From 3e9d5295f118c29193f88ea825902ac359901119 Mon Sep 17 00:00:00 2001 From: Jay Phelps Date: Mon, 27 Mar 2017 11:13:30 -0700 Subject: [PATCH] fix(timeout): Cancels scheduled timeout, if no longer needed * fix(timeout): Cancels scheduled timeout, if no longer needed fixes #2134 * fix(timeoutWith): Cancels scheduled timeout, if no longer needed * build(npm-scripts): update debug_mocha npm script for node 6 * fix(VirtualAction): Block rescheduled VirtualActions from executing their scheduled work. VirtualActions are immutable so they can be inspected by the TestScheduler. In order to mirror rescheduled stateful Actions, rescheduled VirtualActions shouldn't execute if they've been rescheduled before execution. * fix(timeout): Update timeout and timeoutWith to recycle their scheduled timeout actions. The timeout and timeoutWith operators should dispose their scheduled timeout actions on unsubscription. Also, given the new scheduling architecture, they can recycle their scheduled actions so just one action is allocated per subscription. * test(timeout): Add types to timeout and timeoutWith specs * Fix merge conflicts * Fix timeoutWith to work with new Subscriber leak fix. * fix(timeout-spec): fix merge conflicts * fix(Subscription): fold ChildSubscription logic into Subscriber to prevent operators from leaking ChildSubscriptions. (#2360) The addition of ChildSubscription to fix #2244 accidentally introduced a different memory leak. Most operators that add and remove inner Subscriptions store the inner Subscriber instance, not the value returned by Subscription#add. When they try to remove the inner Subscription manually, nothing is removed, because the ChildSubscription wrapper instance is the one added to the subscriptions list. Fixes #2355 * chore(publish): 5.1.1 * Ignore coverage It's 5.5mb that people installing this don't need :) * feat(AjaxObservable) : support 'PATCH' request type Add support of the 'PATCH' request type based on the already existing 'PUT' request. * fix(subscribeToResult): accept array-like as result Accept array-like as a result to subscribe, so that Observable.from and operators using subscribeToResult have identical behaviour. * chore(ajax.patch): Adds test for ajax.patch * fix(forkJoin): add type signature for single observable with selector Add type signature for using forkJoin with single observable as first parameter and selector function as second parameter, so that typings would not prevent usage which is permitted and properly handled by operator. Closes #2347 * feat(webSocket): Add binaryType to config object Add binaryType to config object, so that it is possible to set that parameter on underlying socket before any data emits happen. Closes #2353 * fix(merge): return Observable when called with single lowerCaseO Return Observable when merge is called with single lower case observable, so that merge would always return Observable instance. * fix(bindNodeCallback): emit undefined when callback has no success arguments Emit undefined insteady of empty array by resulting Observable, when callback function is called without success parameters. Closes #2254 * chore(danger): update dangerfile to validate commit message * chore(*): correctly scope disabled `max-line-length` tslint rule The max line length is set to 150 in 'tslint.json'. In specific regions, it is desirable to allow longer lines, so these regions should be wrapped in comments like the following: ```js // Max line length enforced here. /* tslint:disable:max-line-length */ // Max line length NOT enforced here. /* tslint:enable:max-line-length */ <-- CORRECT // Max line length enforced here. ``` In many cases, the re-enabling comment incorrectly included `disable` instead of `enable` (as shown below), which essentially keeps the `max-line-length` **disabled** for the rest of the file: ```js // Max line length enforced here. /* tslint:disable:max-line-length */ // Max line length NOT enforced here. /* tslint:disable:max-line-length */ <-- INCORRECT // Max line length NOT enforced here. ``` This commit fixes these comments, so the `max-line-length` rule is properly enforced in regions that don't need longer lines. * fix(bindCallback): emit undefined when callback is without arguments In resulting Observable emit undefined when callback is called without parameters, instead of emitting empty array. * fix(mergeAll): introduce variant support for mergeMap - closes #2372 * feat(windowTime): maxWindowSize parameter in windowTime operator Adds new parameter in windowTime operator to control how much values given window can emit. Closes #1301 * docs(ObservableInput): add ObservableInput and SubscribableOrPromise descriptions Add ObservableInput and SubscribableOrPromise interface descriptions, as well as link these interfaces in type descriptions of operators, so that users always know what kind of parameters they can pass to used methods. * fix(timeoutWith): update timeoutWith to work with new Subscriber leak fix changes --- package.json | 2 +- spec/operators/timeout-spec.ts | 28 +++++++- spec/operators/timeoutWith-spec.ts | 29 +++++++- spec/schedulers/VirtualTimeScheduler-spec.ts | 13 +++- src/operator/timeout.ts | 56 +++++++-------- src/operator/timeoutWith.ts | 72 ++++++++------------ src/scheduler/VirtualTimeScheduler.ts | 10 ++- 7 files changed, 128 insertions(+), 82 deletions(-) diff --git a/package.json b/package.json index d18276e720..9ad7955289 100644 --- a/package.json +++ b/package.json @@ -91,7 +91,7 @@ "prepublish": "shx rm -rf ./typings && typings install && npm run build_all", "publish_docs": "./publish_docs.sh", "test_mocha": "mocha --opts spec/support/default.opts spec-js", - "debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js", + "debug_mocha": "node --inspect --debug-brk ./node_modules/.bin/_mocha --opts spec/support/debug.opts spec-js", "test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html", "test": "npm-run-all clean_spec build_spec test_mocha clean_spec", "tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js", diff --git a/spec/operators/timeout-spec.ts b/spec/operators/timeout-spec.ts index 814ac7e4a8..8b616ec719 100644 --- a/spec/operators/timeout-spec.ts +++ b/spec/operators/timeout-spec.ts @@ -1,14 +1,14 @@ -import * as Rx from '../../dist/cjs/Rx'; import { expect } from 'chai'; +import * as Rx from '../../dist/cjs/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports declare const { asDiagram }; +declare const rxTestScheduler: Rx.TestScheduler; declare const hot: typeof marbleTestingSignature.hot; declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; -declare const rxTestScheduler: Rx.TestScheduler; const Observable = Rx.Observable; /** @test {timeout} */ @@ -121,4 +121,28 @@ describe('Observable.prototype.timeout', () => { expectObservable(result).toBe(expected, values, defaultTimeoutError); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('--a--b--c---d--e--|'); + const e1subs = '^ ! '; + const expected = '--a--b--c-- '; + const unsub = ' ! '; + + const result = e1 + .lift({ + call: (timeoutSubscriber, source) => { + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source.subscribe(timeoutSubscriber); + } + }) + .timeout(50, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); diff --git a/spec/operators/timeoutWith-spec.ts b/spec/operators/timeoutWith-spec.ts index 25fca612c7..5697a81017 100644 --- a/spec/operators/timeoutWith-spec.ts +++ b/spec/operators/timeoutWith-spec.ts @@ -2,12 +2,12 @@ import * as Rx from '../../dist/cjs/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports declare const { asDiagram }; +declare const rxTestScheduler: Rx.TestScheduler; declare const hot: typeof marbleTestingSignature.hot; declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; -declare const rxTestScheduler: Rx.TestScheduler; const Observable = Rx.Observable; /** @test {timeoutWith} */ @@ -266,4 +266,31 @@ describe('Observable.prototype.timeoutWith', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('---a---b-----c----|'); + const e1subs = '^ ! '; + const e2 = cold( '-x---y| '); + const e2subs = ' ^ ! '; + const expected = '---a---b----x-- '; + const unsub = ' ! '; + + const result = e1 + .lift({ + call: (timeoutSubscriber, source) => { + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source.subscribe(timeoutSubscriber); + } + }) + .timeoutWith(40, e2, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); }); diff --git a/spec/schedulers/VirtualTimeScheduler-spec.ts b/spec/schedulers/VirtualTimeScheduler-spec.ts index 6af1fe5e96..e0e113d6cf 100644 --- a/spec/schedulers/VirtualTimeScheduler-spec.ts +++ b/spec/schedulers/VirtualTimeScheduler-spec.ts @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => { v.flush(); expect(count).to.equal(3); }); -}); \ No newline at end of file + + it('should not execute virtual actions that have been rescheduled before flush', () => { + const v = new VirtualTimeScheduler(); + let messages = []; + let action: VirtualAction = > v.schedule(function(state: string) { + messages.push(state); + }, 10, 'first message'); + action = > action.schedule('second message' , 10); + v.flush(); + expect(messages).to.deep.equal(['second message']); + }); +}); diff --git a/src/operator/timeout.ts b/src/operator/timeout.ts index 9cdf160907..ab7c7edf1d 100644 --- a/src/operator/timeout.ts +++ b/src/operator/timeout.ts @@ -1,3 +1,4 @@ +import { Action } from '../scheduler/Action'; import { async } from '../scheduler/async'; import { isDate } from '../util/isDate'; import { Operator } from '../Operator'; @@ -42,15 +43,8 @@ class TimeoutOperator implements Operator { * @extends {Ignored} */ class TimeoutSubscriber extends Subscriber { - private index: number = 0; - private _previousIndex: number = 0; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } + + private action: Action> = null; constructor(destination: Subscriber, private absoluteTimeout: boolean, @@ -61,40 +55,36 @@ class TimeoutSubscriber extends Subscriber { this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (!source.hasCompleted && source.previousIndex === currentIndex) { - source.notifyTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutSubscriber): void { + subscriber.error(subscriber.errorInstance); } private scheduleTimeout(): void { - let currentIndex = this.index; - this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex }); - this.index++; - this._previousIndex = currentIndex; + const { action } = this; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutSubscriber.dispatchTimeout, this.waitFor, this + ))); + } } protected _next(value: T): void { - this.destination.next(value); - if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any): void { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete(): void { - this.destination.complete(); - this._hasCompleted = true; - } - - notifyTimeout(): void { - this.error(this.errorInstance); + protected _unsubscribe() { + this.action = null; + this.scheduler = null; + this.errorInstance = null; } } diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index 0ee90c7541..86b55d2982 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -1,8 +1,9 @@ +import { Action } from '../scheduler/Action'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { IScheduler } from '../Scheduler'; import { async } from '../scheduler/async'; -import { Subscription, TeardownLogic } from '../Subscription'; +import { TeardownLogic } from '../Subscription'; import { Observable, ObservableInput } from '../Observable'; import { isDate } from '../util/isDate'; import { OuterSubscriber } from '../OuterSubscriber'; @@ -49,65 +50,50 @@ class TimeoutWithOperator implements Operator { * @extends {Ignored} */ class TimeoutWithSubscriber extends OuterSubscriber { - private timeoutSubscription: Subscription = undefined; - private index: number = 0; - private _previousIndex: number = 0; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } - constructor(public destination: Subscriber, + private action: Action> = null; + + constructor(destination: Subscriber, private absoluteTimeout: boolean, private waitFor: number, private withObservable: ObservableInput, private scheduler: IScheduler) { - super(); - destination.add(this); + super(destination); this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (!source.hasCompleted && source.previousIndex === currentIndex) { - source.handleTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutWithSubscriber): void { + const { withObservable } = subscriber; + ( subscriber)._unsubscribeAndRecycle(); + subscriber.add(subscribeToResult(subscriber, withObservable)); } private scheduleTimeout(): void { - let currentIndex = this.index; - const timeoutState = { subscriber: this, index: currentIndex }; - this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState); - this.index++; - this._previousIndex = currentIndex; + const { action } = this; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this + ))); + } } - protected _next(value: T) { - this.destination.next(value); + protected _next(value: T): void { if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any) { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete() { - this.destination.complete(); - this._hasCompleted = true; - } - - handleTimeout(): void { - if (!this.closed) { - const withObservable = this.withObservable; - this.unsubscribe(); - this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable)); - } + protected _unsubscribe() { + this.action = null; + this.scheduler = null; + this.withObservable = null; } } diff --git a/src/scheduler/VirtualTimeScheduler.ts b/src/scheduler/VirtualTimeScheduler.ts index 6cd31bc19f..5da67912b9 100644 --- a/src/scheduler/VirtualTimeScheduler.ts +++ b/src/scheduler/VirtualTimeScheduler.ts @@ -46,6 +46,8 @@ export class VirtualTimeScheduler extends AsyncScheduler { */ export class VirtualAction extends AsyncAction { + protected active: boolean = true; + constructor(protected scheduler: VirtualTimeScheduler, protected work: (this: VirtualAction, state?: T) => void, protected index: number = scheduler.index += 1) { @@ -57,7 +59,7 @@ export class VirtualAction extends AsyncAction { if (!this.id) { return super.schedule(state, delay); } - + this.active = false; // If an action is rescheduled, we save allocations by mutating its state, // pushing it to the end of the scheduler queue, and recycling the action. // But since the VirtualTimeScheduler is used for testing, VirtualActions @@ -79,6 +81,12 @@ export class VirtualAction extends AsyncAction { return undefined; } + protected _execute(state: T, delay: number): any { + if (this.active === true) { + return super._execute(state, delay); + } + } + public static sortActions(a: VirtualAction, b: VirtualAction) { if (a.delay === b.delay) { if (a.index === b.index) {