diff --git a/spec/operators/timeout-spec.ts b/spec/operators/timeout-spec.ts index 95e66bde9b..abea44e91a 100644 --- a/spec/operators/timeout-spec.ts +++ b/spec/operators/timeout-spec.ts @@ -139,4 +139,27 @@ describe('Observable.prototype.timeout', () => { expectObservable(result).toBe(expected, values, value); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('--a--b--c---d--e--|'); + const e1subs = '^ ! '; + const expected = '--a--b--c-- '; + const unsub = ' ! '; + + const result = e1 + .lift(function(source) { + const timeoutSubscriber = this; + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source._subscribe(timeoutSubscriber); + }) + .timeout(50, null, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); diff --git a/spec/operators/timeoutWith-spec.ts b/spec/operators/timeoutWith-spec.ts index fad796032f..ffe082a03c 100644 --- a/spec/operators/timeoutWith-spec.ts +++ b/spec/operators/timeoutWith-spec.ts @@ -260,4 +260,30 @@ describe('Observable.prototype.timeoutWith', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('---a---b-----c----|'); + const e1subs = '^ ! '; + const e2 = cold( '-x---y| '); + const e2subs = ' ^ ! '; + const expected = '---a---b----x-- '; + const unsub = ' ! '; + + const result = e1 + .lift(function(source) { + const timeoutSubscriber = this; + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source._subscribe(timeoutSubscriber); + }) + .timeoutWith(40, e2, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); }); diff --git a/spec/schedulers/VirtualTimeScheduler-spec.ts b/spec/schedulers/VirtualTimeScheduler-spec.ts index 6af1fe5e96..e0e113d6cf 100644 --- a/spec/schedulers/VirtualTimeScheduler-spec.ts +++ b/spec/schedulers/VirtualTimeScheduler-spec.ts @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => { v.flush(); expect(count).to.equal(3); }); -}); \ No newline at end of file + + it('should not execute virtual actions that have been rescheduled before flush', () => { + const v = new VirtualTimeScheduler(); + let messages = []; + let action: VirtualAction = > v.schedule(function(state: string) { + messages.push(state); + }, 10, 'first message'); + action = > action.schedule('second message' , 10); + v.flush(); + expect(messages).to.deep.equal(['second message']); + }); +}); diff --git a/src/operator/timeout.ts b/src/operator/timeout.ts index 461a4e09ea..6bdde5b3a1 100644 --- a/src/operator/timeout.ts +++ b/src/operator/timeout.ts @@ -1,3 +1,4 @@ +import { Action } from '../scheduler/Action'; import { async } from '../scheduler/async'; import { isDate } from '../util/isDate'; import { Operator } from '../Operator'; @@ -5,7 +6,6 @@ import { Subscriber } from '../Subscriber'; import { Scheduler } from '../Scheduler'; import { Observable } from '../Observable'; import { TeardownLogic } from '../Subscription'; -import { Subscription } from '../Subscription'; import { TimeoutError } from '../util/TimeoutError'; /** @@ -44,17 +44,8 @@ class TimeoutOperator implements Operator { * @extends {Ignored} */ class TimeoutSubscriber extends Subscriber { - private index: number = 0; - private _previousIndex: number = 0; - private action: Subscription = null; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } + private action: Action> = null; constructor(destination: Subscriber, private absoluteTimeout: boolean, @@ -65,56 +56,36 @@ class TimeoutSubscriber extends Subscriber { this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (source.previousIndex === currentIndex) { - source.notifyTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutSubscriber): void { + subscriber.error(subscriber.errorToSend || new TimeoutError()); } private scheduleTimeout(): void { - const currentIndex = this.index; - const timeoutState = { subscriber: this, index: currentIndex }; - - this.cancelTimeout(); - this.action = this.scheduler.schedule( - TimeoutSubscriber.dispatchTimeout, this.waitFor, timeoutState - ); - this.add(this.action); - - this.index++; - this._previousIndex = currentIndex; - } - - private cancelTimeout(): void { const { action } = this; - if (action !== null) { - this.remove(action); - action.unsubscribe(); - this.action = null; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutSubscriber.dispatchTimeout, this.waitFor, this + ))); } } protected _next(value: T): void { - this.destination.next(value); - if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any): void { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete(): void { - this.destination.complete(); - this._hasCompleted = true; - } - - notifyTimeout(): void { - this.error(this.errorToSend || new TimeoutError()); + private _unsubscribe() { + this.action = null; + this.scheduler = null; + this.errorToSend = null; } } diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index bb1a126492..1ef01222da 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -1,8 +1,9 @@ +import { Action } from '../scheduler/Action'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Scheduler } from '../Scheduler'; import { async } from '../scheduler/async'; -import { Subscription, TeardownLogic } from '../Subscription'; +import { TeardownLogic } from '../Subscription'; import { Observable, ObservableInput } from '../Observable'; import { isDate } from '../util/isDate'; import { OuterSubscriber } from '../OuterSubscriber'; @@ -49,81 +50,52 @@ class TimeoutWithOperator implements Operator { * @extends {Ignored} */ class TimeoutWithSubscriber extends OuterSubscriber { - private timeoutSubscription: Subscription = undefined; - private action: Subscription = null; - private index: number = 0; - private _previousIndex: number = 0; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } - constructor(public destination: Subscriber, + private action: Action> = null; + + constructor(destination: Subscriber, private absoluteTimeout: boolean, private waitFor: number, private withObservable: ObservableInput, private scheduler: Scheduler) { - super(); - destination.add(this); + super(destination); this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (!source.hasCompleted && source.previousIndex === currentIndex) { - source.handleTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutWithSubscriber): void { + const { withObservable } = subscriber; + subscriber.unsubscribe(); + subscriber.closed = false; + subscriber.isStopped = false; + subscriber.add(subscribeToResult(subscriber, withObservable)); } private scheduleTimeout(): void { - const currentIndex = this.index; - const timeoutState = { subscriber: this, index: currentIndex }; - - this.cancelTimeout(); - this.action = this.scheduler.schedule( - TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState - ); - this.add(this.action); - - this.index++; - this._previousIndex = currentIndex; - } - - private cancelTimeout(): void { const { action } = this; - if (action !== null) { - this.remove(action); - action.unsubscribe(); - this.action = null; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this + ))); } } - protected _next(value: T) { - this.destination.next(value); + protected _next(value: T): void { if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any) { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete() { - this.destination.complete(); - this._hasCompleted = true; - } - - handleTimeout(): void { - if (!this.closed) { - const withObservable = this.withObservable; - this.unsubscribe(); - this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable)); - } + private _unsubscribe() { + this.action = null; + this.scheduler = null; + this.withObservable = null; } }