diff --git a/src/operator/windowTime.ts b/src/operator/windowTime.ts index a1f9964a5b..88687db490 100644 --- a/src/operator/windowTime.ts +++ b/src/operator/windowTime.ts @@ -1,13 +1,9 @@ import { IScheduler } from '../Scheduler'; -import { Action } from '../scheduler/Action'; -import { Subject } from '../Subject'; -import { Operator } from '../Operator'; import { async } from '../scheduler/async'; -import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { Subscription } from '../Subscription'; import { isNumeric } from '../util/isNumeric'; import { isScheduler } from '../util/isScheduler'; +import { windowTime as higherOrder } from '../operators'; /** * Branch out the source Observable values as a nested Observable periodically @@ -102,160 +98,5 @@ export function windowTime(this: Observable, windowCreationInterval = arguments[1]; } - return this.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)); -} - -class WindowTimeOperator implements Operator> { - - constructor(private windowTimeSpan: number, - private windowCreationInterval: number | null, - private maxWindowSize: number, - private scheduler: IScheduler) { - } - - call(subscriber: Subscriber>, source: any): any { - return source.subscribe(new WindowTimeSubscriber( - subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler - )); - } -} - -interface CreationState { - windowTimeSpan: number; - windowCreationInterval: number; - subscriber: WindowTimeSubscriber; - scheduler: IScheduler; -} - -interface TimeSpanOnlyState { - window: CountedSubject; - windowTimeSpan: number; - subscriber: WindowTimeSubscriber; - } - -interface CloseWindowContext { - action: Action>; - subscription: Subscription; -} - -interface CloseState { - subscriber: WindowTimeSubscriber; - window: CountedSubject; - context: CloseWindowContext; -} - -class CountedSubject extends Subject { - private _numberOfNextedValues: number = 0; - - next(value?: T): void { - this._numberOfNextedValues++; - super.next(value); - } - - get numberOfNextedValues(): number { - return this._numberOfNextedValues; - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class WindowTimeSubscriber extends Subscriber { - private windows: CountedSubject[] = []; - - constructor(protected destination: Subscriber>, - private windowTimeSpan: number, - private windowCreationInterval: number | null, - private maxWindowSize: number, - private scheduler: IScheduler) { - super(destination); - - const window = this.openWindow(); - if (windowCreationInterval !== null && windowCreationInterval >= 0) { - const closeState: CloseState = { subscriber: this, window, context: null }; - const creationState: CreationState = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler }; - this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, closeState)); - this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, creationState)); - } else { - const timeSpanOnlyState: TimeSpanOnlyState = { subscriber: this, window, windowTimeSpan }; - this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState)); - } - } - - protected _next(value: T): void { - const windows = this.windows; - const len = windows.length; - for (let i = 0; i < len; i++) { - const window = windows[i]; - if (!window.closed) { - window.next(value); - if (window.numberOfNextedValues >= this.maxWindowSize) { - this.closeWindow(window); - } - } - } - } - - protected _error(err: any): void { - const windows = this.windows; - while (windows.length > 0) { - windows.shift().error(err); - } - this.destination.error(err); - } - - protected _complete(): void { - const windows = this.windows; - while (windows.length > 0) { - const window = windows.shift(); - if (!window.closed) { - window.complete(); - } - } - this.destination.complete(); - } - - public openWindow(): CountedSubject { - const window = new CountedSubject(); - this.windows.push(window); - const destination = this.destination; - destination.next(window); - return window; - } - - public closeWindow(window: CountedSubject): void { - window.complete(); - const windows = this.windows; - windows.splice(windows.indexOf(window), 1); - } -} - -function dispatchWindowTimeSpanOnly(this: Action>, state: TimeSpanOnlyState): void { - const { subscriber, windowTimeSpan, window } = state; - if (window) { - subscriber.closeWindow(window); - } - state.window = subscriber.openWindow(); - this.schedule(state, windowTimeSpan); -} - -function dispatchWindowCreation(this: Action>, state: CreationState): void { - const { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state; - const window = subscriber.openWindow(); - const action = this; - let context: CloseWindowContext = { action, subscription: null }; - const timeSpanState: CloseState = { subscriber, window, context }; - context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, timeSpanState); - action.add(context.subscription); - action.schedule(state, windowCreationInterval); -} - -function dispatchWindowClose(state: CloseState): void { - const { subscriber, window, context } = state; - if (context && context.action && context.subscription) { - context.action.remove(context.subscription); - } - subscriber.closeWindow(window); + return higherOrder(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 8ddcbae9a0..dabd6dadcc 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -23,4 +23,5 @@ export { switchMap } from './switchMap'; export { takeLast } from './takeLast'; export { tap } from './tap'; export { window } from './window'; +export { windowTime } from './windowTime'; export { windowToggle } from './windowToggle'; diff --git a/src/operators/windowTime.ts b/src/operators/windowTime.ts new file mode 100644 index 0000000000..3dc6de4257 --- /dev/null +++ b/src/operators/windowTime.ts @@ -0,0 +1,262 @@ +import { IScheduler } from '../Scheduler'; +import { Action } from '../scheduler/Action'; +import { Subject } from '../Subject'; +import { Operator } from '../Operator'; +import { async } from '../scheduler/async'; +import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; +import { Subscription } from '../Subscription'; +import { isNumeric } from '../util/isNumeric'; +import { isScheduler } from '../util/isScheduler'; +import { OperatorFunction } from '../interfaces'; + +/** + * Branch out the source Observable values as a nested Observable periodically + * in time. + * + * It's like {@link bufferTime}, but emits a nested + * Observable instead of an array. + * + * + * + * Returns an Observable that emits windows of items it collects from the source + * Observable. The output Observable starts a new window periodically, as + * determined by the `windowCreationInterval` argument. It emits each window + * after a fixed timespan, specified by the `windowTimeSpan` argument. When the + * source Observable completes or encounters an error, the output Observable + * emits the current window and propagates the notification from the source + * Observable. If `windowCreationInterval` is not provided, the output + * Observable starts a new window when the previous window of duration + * `windowTimeSpan` completes. If `maxWindowCount` is provided, each window + * will emit at most fixed number of values. Window will complete immediately + * after emitting last value and next one still will open as specified by + * `windowTimeSpan` and `windowCreationInterval` arguments. + * + * @example In every window of 1 second each, emit at most 2 click events + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.windowTime(1000) + * .map(win => win.take(2)) // each window has at most 2 emissions + * .mergeAll(); // flatten the Observable-of-Observables + * result.subscribe(x => console.log(x)); + * + * @example Every 5 seconds start a window 1 second long, and emit at most 2 click events per window + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.windowTime(1000, 5000) + * .map(win => win.take(2)) // each window has at most 2 emissions + * .mergeAll(); // flatten the Observable-of-Observables + * result.subscribe(x => console.log(x)); + * + * @example Same as example above but with maxWindowCount instead of take + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.windowTime(1000, 5000, 2) // each window has still at most 2 emissions + * .mergeAll(); // flatten the Observable-of-Observables + * result.subscribe(x => console.log(x)); + + * @see {@link window} + * @see {@link windowCount} + * @see {@link windowToggle} + * @see {@link windowWhen} + * @see {@link bufferTime} + * + * @param {number} windowTimeSpan The amount of time to fill each window. + * @param {number} [windowCreationInterval] The interval at which to start new + * windows. + * @param {number} [maxWindowSize=Number.POSITIVE_INFINITY] Max number of + * values each window can emit before completion. + * @param {Scheduler} [scheduler=async] The scheduler on which to schedule the + * intervals that determine window boundaries. + * @return {Observable>} An observable of windows, which in turn + * are Observables. + * @method windowTime + * @owner Observable + */ +export function windowTime(windowTimeSpan: number, + scheduler?: IScheduler): OperatorFunction>; +export function windowTime(windowTimeSpan: number, + windowCreationInterval: number, + scheduler?: IScheduler): OperatorFunction>; +export function windowTime(windowTimeSpan: number, + windowCreationInterval: number, + maxWindowSize: number, + scheduler?: IScheduler): OperatorFunction>; + +export function windowTime(windowTimeSpan: number): OperatorFunction> { + let scheduler: IScheduler = async; + let windowCreationInterval: number = null; + let maxWindowSize: number = Number.POSITIVE_INFINITY; + + if (isScheduler(arguments[3])) { + scheduler = arguments[3]; + } + + if (isScheduler(arguments[2])) { + scheduler = arguments[2]; + } else if (isNumeric(arguments[2])) { + maxWindowSize = arguments[2]; + } + + if (isScheduler(arguments[1])) { + scheduler = arguments[1]; + } else if (isNumeric(arguments[1])) { + windowCreationInterval = arguments[1]; + } + + return function windowTimeOperatorFunction(source: Observable) { + return source.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)); + }; +} + +class WindowTimeOperator implements Operator> { + + constructor(private windowTimeSpan: number, + private windowCreationInterval: number | null, + private maxWindowSize: number, + private scheduler: IScheduler) { + } + + call(subscriber: Subscriber>, source: any): any { + return source.subscribe(new WindowTimeSubscriber( + subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler + )); + } +} + +interface CreationState { + windowTimeSpan: number; + windowCreationInterval: number; + subscriber: WindowTimeSubscriber; + scheduler: IScheduler; +} + +interface TimeSpanOnlyState { + window: CountedSubject; + windowTimeSpan: number; + subscriber: WindowTimeSubscriber; + } + +interface CloseWindowContext { + action: Action>; + subscription: Subscription; +} + +interface CloseState { + subscriber: WindowTimeSubscriber; + window: CountedSubject; + context: CloseWindowContext; +} + +class CountedSubject extends Subject { + private _numberOfNextedValues: number = 0; + + next(value?: T): void { + this._numberOfNextedValues++; + super.next(value); + } + + get numberOfNextedValues(): number { + return this._numberOfNextedValues; + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class WindowTimeSubscriber extends Subscriber { + private windows: CountedSubject[] = []; + + constructor(protected destination: Subscriber>, + private windowTimeSpan: number, + private windowCreationInterval: number | null, + private maxWindowSize: number, + private scheduler: IScheduler) { + super(destination); + + const window = this.openWindow(); + if (windowCreationInterval !== null && windowCreationInterval >= 0) { + const closeState: CloseState = { subscriber: this, window, context: null }; + const creationState: CreationState = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler }; + this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, closeState)); + this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, creationState)); + } else { + const timeSpanOnlyState: TimeSpanOnlyState = { subscriber: this, window, windowTimeSpan }; + this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState)); + } + } + + protected _next(value: T): void { + const windows = this.windows; + const len = windows.length; + for (let i = 0; i < len; i++) { + const window = windows[i]; + if (!window.closed) { + window.next(value); + if (window.numberOfNextedValues >= this.maxWindowSize) { + this.closeWindow(window); + } + } + } + } + + protected _error(err: any): void { + const windows = this.windows; + while (windows.length > 0) { + windows.shift().error(err); + } + this.destination.error(err); + } + + protected _complete(): void { + const windows = this.windows; + while (windows.length > 0) { + const window = windows.shift(); + if (!window.closed) { + window.complete(); + } + } + this.destination.complete(); + } + + public openWindow(): CountedSubject { + const window = new CountedSubject(); + this.windows.push(window); + const destination = this.destination; + destination.next(window); + return window; + } + + public closeWindow(window: CountedSubject): void { + window.complete(); + const windows = this.windows; + windows.splice(windows.indexOf(window), 1); + } +} + +function dispatchWindowTimeSpanOnly(this: Action>, state: TimeSpanOnlyState): void { + const { subscriber, windowTimeSpan, window } = state; + if (window) { + subscriber.closeWindow(window); + } + state.window = subscriber.openWindow(); + this.schedule(state, windowTimeSpan); +} + +function dispatchWindowCreation(this: Action>, state: CreationState): void { + const { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state; + const window = subscriber.openWindow(); + const action = this; + let context: CloseWindowContext = { action, subscription: null }; + const timeSpanState: CloseState = { subscriber, window, context }; + context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, timeSpanState); + action.add(context.subscription); + action.schedule(state, windowCreationInterval); +} + +function dispatchWindowClose(state: CloseState): void { + const { subscriber, window, context } = state; + if (context && context.action && context.subscription) { + context.action.remove(context.subscription); + } + subscriber.closeWindow(window); +}