diff --git a/package.json b/package.json index d18276e720..9ad7955289 100644 --- a/package.json +++ b/package.json @@ -91,7 +91,7 @@ "prepublish": "shx rm -rf ./typings && typings install && npm run build_all", "publish_docs": "./publish_docs.sh", "test_mocha": "mocha --opts spec/support/default.opts spec-js", - "debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js", + "debug_mocha": "node --inspect --debug-brk ./node_modules/.bin/_mocha --opts spec/support/debug.opts spec-js", "test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html", "test": "npm-run-all clean_spec build_spec test_mocha clean_spec", "tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js", diff --git a/spec/operators/timeout-spec.ts b/spec/operators/timeout-spec.ts index 814ac7e4a8..8b616ec719 100644 --- a/spec/operators/timeout-spec.ts +++ b/spec/operators/timeout-spec.ts @@ -1,14 +1,14 @@ -import * as Rx from '../../dist/cjs/Rx'; import { expect } from 'chai'; +import * as Rx from '../../dist/cjs/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports declare const { asDiagram }; +declare const rxTestScheduler: Rx.TestScheduler; declare const hot: typeof marbleTestingSignature.hot; declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; -declare const rxTestScheduler: Rx.TestScheduler; const Observable = Rx.Observable; /** @test {timeout} */ @@ -121,4 +121,28 @@ describe('Observable.prototype.timeout', () => { expectObservable(result).toBe(expected, values, defaultTimeoutError); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('--a--b--c---d--e--|'); + const e1subs = '^ ! '; + const expected = '--a--b--c-- '; + const unsub = ' ! '; + + const result = e1 + .lift({ + call: (timeoutSubscriber, source) => { + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source.subscribe(timeoutSubscriber); + } + }) + .timeout(50, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); diff --git a/spec/operators/timeoutWith-spec.ts b/spec/operators/timeoutWith-spec.ts index 25fca612c7..5697a81017 100644 --- a/spec/operators/timeoutWith-spec.ts +++ b/spec/operators/timeoutWith-spec.ts @@ -2,12 +2,12 @@ import * as Rx from '../../dist/cjs/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports declare const { asDiagram }; +declare const rxTestScheduler: Rx.TestScheduler; declare const hot: typeof marbleTestingSignature.hot; declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; -declare const rxTestScheduler: Rx.TestScheduler; const Observable = Rx.Observable; /** @test {timeoutWith} */ @@ -266,4 +266,31 @@ describe('Observable.prototype.timeoutWith', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('---a---b-----c----|'); + const e1subs = '^ ! '; + const e2 = cold( '-x---y| '); + const e2subs = ' ^ ! '; + const expected = '---a---b----x-- '; + const unsub = ' ! '; + + const result = e1 + .lift({ + call: (timeoutSubscriber, source) => { + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source.subscribe(timeoutSubscriber); + } + }) + .timeoutWith(40, e2, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); }); diff --git a/spec/schedulers/VirtualTimeScheduler-spec.ts b/spec/schedulers/VirtualTimeScheduler-spec.ts index 6af1fe5e96..e0e113d6cf 100644 --- a/spec/schedulers/VirtualTimeScheduler-spec.ts +++ b/spec/schedulers/VirtualTimeScheduler-spec.ts @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => { v.flush(); expect(count).to.equal(3); }); -}); \ No newline at end of file + + it('should not execute virtual actions that have been rescheduled before flush', () => { + const v = new VirtualTimeScheduler(); + let messages = []; + let action: VirtualAction = > v.schedule(function(state: string) { + messages.push(state); + }, 10, 'first message'); + action = > action.schedule('second message' , 10); + v.flush(); + expect(messages).to.deep.equal(['second message']); + }); +}); diff --git a/src/operator/timeout.ts b/src/operator/timeout.ts index 9cdf160907..ab7c7edf1d 100644 --- a/src/operator/timeout.ts +++ b/src/operator/timeout.ts @@ -1,3 +1,4 @@ +import { Action } from '../scheduler/Action'; import { async } from '../scheduler/async'; import { isDate } from '../util/isDate'; import { Operator } from '../Operator'; @@ -42,15 +43,8 @@ class TimeoutOperator implements Operator { * @extends {Ignored} */ class TimeoutSubscriber extends Subscriber { - private index: number = 0; - private _previousIndex: number = 0; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } + + private action: Action> = null; constructor(destination: Subscriber, private absoluteTimeout: boolean, @@ -61,40 +55,36 @@ class TimeoutSubscriber extends Subscriber { this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (!source.hasCompleted && source.previousIndex === currentIndex) { - source.notifyTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutSubscriber): void { + subscriber.error(subscriber.errorInstance); } private scheduleTimeout(): void { - let currentIndex = this.index; - this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex }); - this.index++; - this._previousIndex = currentIndex; + const { action } = this; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutSubscriber.dispatchTimeout, this.waitFor, this + ))); + } } protected _next(value: T): void { - this.destination.next(value); - if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any): void { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete(): void { - this.destination.complete(); - this._hasCompleted = true; - } - - notifyTimeout(): void { - this.error(this.errorInstance); + protected _unsubscribe() { + this.action = null; + this.scheduler = null; + this.errorInstance = null; } } diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index 0ee90c7541..86b55d2982 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -1,8 +1,9 @@ +import { Action } from '../scheduler/Action'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { IScheduler } from '../Scheduler'; import { async } from '../scheduler/async'; -import { Subscription, TeardownLogic } from '../Subscription'; +import { TeardownLogic } from '../Subscription'; import { Observable, ObservableInput } from '../Observable'; import { isDate } from '../util/isDate'; import { OuterSubscriber } from '../OuterSubscriber'; @@ -49,65 +50,50 @@ class TimeoutWithOperator implements Operator { * @extends {Ignored} */ class TimeoutWithSubscriber extends OuterSubscriber { - private timeoutSubscription: Subscription = undefined; - private index: number = 0; - private _previousIndex: number = 0; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } - constructor(public destination: Subscriber, + private action: Action> = null; + + constructor(destination: Subscriber, private absoluteTimeout: boolean, private waitFor: number, private withObservable: ObservableInput, private scheduler: IScheduler) { - super(); - destination.add(this); + super(destination); this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (!source.hasCompleted && source.previousIndex === currentIndex) { - source.handleTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutWithSubscriber): void { + const { withObservable } = subscriber; + ( subscriber)._unsubscribeAndRecycle(); + subscriber.add(subscribeToResult(subscriber, withObservable)); } private scheduleTimeout(): void { - let currentIndex = this.index; - const timeoutState = { subscriber: this, index: currentIndex }; - this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState); - this.index++; - this._previousIndex = currentIndex; + const { action } = this; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this + ))); + } } - protected _next(value: T) { - this.destination.next(value); + protected _next(value: T): void { if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any) { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete() { - this.destination.complete(); - this._hasCompleted = true; - } - - handleTimeout(): void { - if (!this.closed) { - const withObservable = this.withObservable; - this.unsubscribe(); - this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable)); - } + protected _unsubscribe() { + this.action = null; + this.scheduler = null; + this.withObservable = null; } } diff --git a/src/scheduler/VirtualTimeScheduler.ts b/src/scheduler/VirtualTimeScheduler.ts index 6cd31bc19f..5da67912b9 100644 --- a/src/scheduler/VirtualTimeScheduler.ts +++ b/src/scheduler/VirtualTimeScheduler.ts @@ -46,6 +46,8 @@ export class VirtualTimeScheduler extends AsyncScheduler { */ export class VirtualAction extends AsyncAction { + protected active: boolean = true; + constructor(protected scheduler: VirtualTimeScheduler, protected work: (this: VirtualAction, state?: T) => void, protected index: number = scheduler.index += 1) { @@ -57,7 +59,7 @@ export class VirtualAction extends AsyncAction { if (!this.id) { return super.schedule(state, delay); } - + this.active = false; // If an action is rescheduled, we save allocations by mutating its state, // pushing it to the end of the scheduler queue, and recycling the action. // But since the VirtualTimeScheduler is used for testing, VirtualActions @@ -79,6 +81,12 @@ export class VirtualAction extends AsyncAction { return undefined; } + protected _execute(state: T, delay: number): any { + if (this.active === true) { + return super._execute(state, delay); + } + } + public static sortActions(a: VirtualAction, b: VirtualAction) { if (a.delay === b.delay) { if (a.index === b.index) {