From 8049b78913ce37e5704d8865ad992edc21e70198 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Sat, 19 Nov 2016 19:21:35 -0800 Subject: [PATCH] fix(timeout): Update timeout and timeoutWith to recycle their scheduled timeout actions. The timeout and timeoutWith operators should dispose their scheduled timeout actions on unsubscription. Also, given the new scheduling architecture, they can recycle their scheduled actions so just one action is allocated per subscription. #2134 #2135 --- spec/operators/timeout-spec.ts | 23 ++++++ spec/operators/timeoutWith-spec.ts | 26 ++++++ spec/schedulers/VirtualTimeScheduler-spec.ts | 13 ++- src/operator/timeout.ts | 69 +++++----------- src/operator/timeoutWith.ts | 86 +++++++------------- 5 files changed, 110 insertions(+), 107 deletions(-) diff --git a/spec/operators/timeout-spec.ts b/spec/operators/timeout-spec.ts index 95e66bde9b..abea44e91a 100644 --- a/spec/operators/timeout-spec.ts +++ b/spec/operators/timeout-spec.ts @@ -139,4 +139,27 @@ describe('Observable.prototype.timeout', () => { expectObservable(result).toBe(expected, values, value); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('--a--b--c---d--e--|'); + const e1subs = '^ ! '; + const expected = '--a--b--c-- '; + const unsub = ' ! '; + + const result = e1 + .lift(function(source) { + const timeoutSubscriber = this; + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source._subscribe(timeoutSubscriber); + }) + .timeout(50, null, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); diff --git a/spec/operators/timeoutWith-spec.ts b/spec/operators/timeoutWith-spec.ts index fad796032f..ffe082a03c 100644 --- a/spec/operators/timeoutWith-spec.ts +++ b/spec/operators/timeoutWith-spec.ts @@ -260,4 +260,30 @@ describe('Observable.prototype.timeoutWith', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('---a---b-----c----|'); + const e1subs = '^ ! '; + const e2 = cold( '-x---y| '); + const e2subs = ' ^ ! '; + const expected = '---a---b----x-- '; + const unsub = ' ! '; + + const result = e1 + .lift(function(source) { + const timeoutSubscriber = this; + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source._subscribe(timeoutSubscriber); + }) + .timeoutWith(40, e2, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); }); diff --git a/spec/schedulers/VirtualTimeScheduler-spec.ts b/spec/schedulers/VirtualTimeScheduler-spec.ts index 6af1fe5e96..e0e113d6cf 100644 --- a/spec/schedulers/VirtualTimeScheduler-spec.ts +++ b/spec/schedulers/VirtualTimeScheduler-spec.ts @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => { v.flush(); expect(count).to.equal(3); }); -}); \ No newline at end of file + + it('should not execute virtual actions that have been rescheduled before flush', () => { + const v = new VirtualTimeScheduler(); + let messages = []; + let action: VirtualAction = > v.schedule(function(state: string) { + messages.push(state); + }, 10, 'first message'); + action = > action.schedule('second message' , 10); + v.flush(); + expect(messages).to.deep.equal(['second message']); + }); +}); diff --git a/src/operator/timeout.ts b/src/operator/timeout.ts index 461a4e09ea..6bdde5b3a1 100644 --- a/src/operator/timeout.ts +++ b/src/operator/timeout.ts @@ -1,3 +1,4 @@ +import { Action } from '../scheduler/Action'; import { async } from '../scheduler/async'; import { isDate } from '../util/isDate'; import { Operator } from '../Operator'; @@ -5,7 +6,6 @@ import { Subscriber } from '../Subscriber'; import { Scheduler } from '../Scheduler'; import { Observable } from '../Observable'; import { TeardownLogic } from '../Subscription'; -import { Subscription } from '../Subscription'; import { TimeoutError } from '../util/TimeoutError'; /** @@ -44,17 +44,8 @@ class TimeoutOperator implements Operator { * @extends {Ignored} */ class TimeoutSubscriber extends Subscriber { - private index: number = 0; - private _previousIndex: number = 0; - private action: Subscription = null; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } + private action: Action> = null; constructor(destination: Subscriber, private absoluteTimeout: boolean, @@ -65,56 +56,36 @@ class TimeoutSubscriber extends Subscriber { this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (source.previousIndex === currentIndex) { - source.notifyTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutSubscriber): void { + subscriber.error(subscriber.errorToSend || new TimeoutError()); } private scheduleTimeout(): void { - const currentIndex = this.index; - const timeoutState = { subscriber: this, index: currentIndex }; - - this.cancelTimeout(); - this.action = this.scheduler.schedule( - TimeoutSubscriber.dispatchTimeout, this.waitFor, timeoutState - ); - this.add(this.action); - - this.index++; - this._previousIndex = currentIndex; - } - - private cancelTimeout(): void { const { action } = this; - if (action !== null) { - this.remove(action); - action.unsubscribe(); - this.action = null; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutSubscriber.dispatchTimeout, this.waitFor, this + ))); } } protected _next(value: T): void { - this.destination.next(value); - if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any): void { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete(): void { - this.destination.complete(); - this._hasCompleted = true; - } - - notifyTimeout(): void { - this.error(this.errorToSend || new TimeoutError()); + private _unsubscribe() { + this.action = null; + this.scheduler = null; + this.errorToSend = null; } } diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index bb1a126492..1ef01222da 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -1,8 +1,9 @@ +import { Action } from '../scheduler/Action'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Scheduler } from '../Scheduler'; import { async } from '../scheduler/async'; -import { Subscription, TeardownLogic } from '../Subscription'; +import { TeardownLogic } from '../Subscription'; import { Observable, ObservableInput } from '../Observable'; import { isDate } from '../util/isDate'; import { OuterSubscriber } from '../OuterSubscriber'; @@ -49,81 +50,52 @@ class TimeoutWithOperator implements Operator { * @extends {Ignored} */ class TimeoutWithSubscriber extends OuterSubscriber { - private timeoutSubscription: Subscription = undefined; - private action: Subscription = null; - private index: number = 0; - private _previousIndex: number = 0; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } - constructor(public destination: Subscriber, + private action: Action> = null; + + constructor(destination: Subscriber, private absoluteTimeout: boolean, private waitFor: number, private withObservable: ObservableInput, private scheduler: Scheduler) { - super(); - destination.add(this); + super(destination); this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (!source.hasCompleted && source.previousIndex === currentIndex) { - source.handleTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutWithSubscriber): void { + const { withObservable } = subscriber; + subscriber.unsubscribe(); + subscriber.closed = false; + subscriber.isStopped = false; + subscriber.add(subscribeToResult(subscriber, withObservable)); } private scheduleTimeout(): void { - const currentIndex = this.index; - const timeoutState = { subscriber: this, index: currentIndex }; - - this.cancelTimeout(); - this.action = this.scheduler.schedule( - TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState - ); - this.add(this.action); - - this.index++; - this._previousIndex = currentIndex; - } - - private cancelTimeout(): void { const { action } = this; - if (action !== null) { - this.remove(action); - action.unsubscribe(); - this.action = null; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this + ))); } } - protected _next(value: T) { - this.destination.next(value); + protected _next(value: T): void { if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any) { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete() { - this.destination.complete(); - this._hasCompleted = true; - } - - handleTimeout(): void { - if (!this.closed) { - const withObservable = this.withObservable; - this.unsubscribe(); - this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable)); - } + private _unsubscribe() { + this.action = null; + this.scheduler = null; + this.withObservable = null; } }