Skip to content

Commit

Permalink
Merge pull request ReactiveX#2187 from Podlas29/window-time-with-max-…
Browse files Browse the repository at this point in the history
…size

feat(windowTime): maxWindowSize parameter in windowTime operator
  • Loading branch information
jayphelps authored Feb 15, 2017
2 parents 02cbda6 + 381be3f commit db8dc77
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 24 deletions.
59 changes: 48 additions & 11 deletions spec/operators/windowTime-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,51 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '--a--(b|) ');
const y = cold( '-d--e| ');
const z = cold( '-g--h| ');
const values = { x: x, y: y, z: z };
const values = { x, y, z };

const result = source.windowTime(50, 100, rxTestScheduler);

expectObservable(result).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should close windows after max count is reached', () => {
const source = hot('--1--2--^--a--b--c--d--e--f--g-----|');
const subs = '^ !';
const timeSpan = time( '----------|');
// 100 frames 0---------1---------2------|
const expected = 'x---------y---------z------|';
const x = cold( '---a--(b|) ');
const y = cold( '--d--(e|) ');
const z = cold( '-g-----|');
const values = { x, y, z };

const result = source.windowTime(timeSpan, null, 2, rxTestScheduler);

expectObservable(result).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should close window after max count is reached with' +
'windowCreationInterval', () => {
const source = hot('--1--2--^-a--b--c--de-f---g--h--i-|');
const subs = '^ !';
// 100 frames 0---------1---------2-----|
// 50 ----|
// 50 ----|
// 50 ----|
const expected = 'x---------y---------z-----|';
const x = cold( '--a--(b|) ');
const y = cold( '-de-(f|) ');
const z = cold( '-h--i| ');
const values = { x, y, z };

const result = source.windowTime(50, 100, 3, rxTestScheduler);

expectObservable(result).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should emit windows given windowTimeSpan', () => {
const source = hot('--1--2--^--a--b--c--d--e--f--g--h--|');
const subs = '^ !';
Expand All @@ -40,9 +77,9 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a--b--c| ');
const y = cold( '--d--e--f-| ');
const z = cold( '-g--h--|');
const values = { x: x, y: y, z: z };
const values = { x, y, z };

const result = source.windowTime(timeSpan, null, rxTestScheduler);
const result = source.windowTime(timeSpan, rxTestScheduler);

expectObservable(result).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -61,7 +98,7 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a-| ');
const y = cold( '--d--(e|) ');
const z = cold( '-g--h| ');
const values = { x: x, y: y, z: z };
const values = { x, y, z };

const result = source.windowTime(timeSpan, interval, rxTestScheduler);

Expand All @@ -74,7 +111,7 @@ describe('Observable.prototype.windowTime', () => {
const subs = '(^!)';
const expected = '(w|)';
const w = cold('|');
const expectedValues = { w: w };
const expectedValues = { w };
const timeSpan = time('-----|');
const interval = time('----------|');

Expand All @@ -89,7 +126,7 @@ describe('Observable.prototype.windowTime', () => {
const subs = '(^!)';
const expected = '(w|)';
const w = cold('(a|)');
const expectedValues = { w: w };
const expectedValues = { w };
const timeSpan = time('-----|');
const interval = time('----------|');

Expand All @@ -110,7 +147,7 @@ describe('Observable.prototype.windowTime', () => {
const c = cold( '---| ');
const d = cold( '--');
const unsub = ' !';
const expectedValues = { a: a, b: b, c: c, d: d };
const expectedValues = { a, b, c, d };

const result = source.windowTime(timeSpan, interval, rxTestScheduler);

Expand All @@ -123,7 +160,7 @@ describe('Observable.prototype.windowTime', () => {
const subs = '(^!)';
const expected = '(w#)';
const w = cold('#');
const expectedValues = { w: w };
const expectedValues = { w };
const timeSpan = time('-----|');
const interval = time('----------|');

Expand All @@ -146,7 +183,7 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a-| ');
const y = cold( '--d--(e|) ');
const z = cold( '-g--h| ');
const values = { x: x, y: y, z: z };
const values = { x, y, z };

const result = source.windowTime(timeSpan, interval, rxTestScheduler);

Expand All @@ -168,7 +205,7 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a-| ');
const y = cold( '-- ');
const unsub = ' ! ';
const values = { x: x, y: y };
const values = { x, y };

const result = source.windowTime(timeSpan, interval, rxTestScheduler);

Expand All @@ -189,7 +226,7 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a-| ');
const y = cold( '--d-- ');
const unsub = ' ! ';
const values = { x: x, y: y };
const values = { x, y };

const result = source
.mergeMap((x: string) => Observable.of(x))
Expand Down
87 changes: 74 additions & 13 deletions src/operator/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { async } from '../scheduler/async';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { isNumeric } from '../util/isNumeric';
import { isScheduler } from '../util/isScheduler';

/**
* Branch out the source Observable values as a nested Observable periodically
Expand All @@ -24,7 +26,10 @@ import { Subscription } from '../Subscription';
* emits the current window and propagates the notification from the source
* Observable. If `windowCreationInterval` is not provided, the output
* Observable starts a new window when the previous window of duration
* `windowTimeSpan` completes.
* `windowTimeSpan` completes. If `maxWindowCount` is provided, each window
* will emit at most fixed number of values. Window will complete immediately
* after emitting last value and next one still will open as specified by
* `windowTimeSpan` and `windowCreationInterval` arguments.
*
* @example <caption>In every window of 1 second each, emit at most 2 click events</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
Expand All @@ -40,6 +45,12 @@ import { Subscription } from '../Subscription';
* .mergeAll(); // flatten the Observable-of-Observables
* result.subscribe(x => console.log(x));
*
* @example <caption>Same as example above but with maxWindowCount instead of take</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var result = clicks.windowTime(1000, 5000, 2) // each window has still at most 2 emissions
* .mergeAll(); // flatten the Observable-of-Observables
* result.subscribe(x => console.log(x));
* @see {@link window}
* @see {@link windowCount}
* @see {@link windowToggle}
Expand All @@ -49,6 +60,8 @@ import { Subscription } from '../Subscription';
* @param {number} windowTimeSpan The amount of time to fill each window.
* @param {number} [windowCreationInterval] The interval at which to start new
* windows.
* @param {number} [maxWindowSize=Number.POSITIVE_INFINITY] Max number of
* values each window can emit before completion.
* @param {Scheduler} [scheduler=async] The scheduler on which to schedule the
* intervals that determine window boundaries.
* @return {Observable<Observable<T>>} An observable of windows, which in turn
Expand All @@ -57,21 +70,52 @@ import { Subscription } from '../Subscription';
* @owner Observable
*/
export function windowTime<T>(this: Observable<T>, windowTimeSpan: number,
windowCreationInterval: number = null,
scheduler: IScheduler = async): Observable<Observable<T>> {
return this.lift(new WindowTimeOperator<T>(windowTimeSpan, windowCreationInterval, scheduler));
scheduler?: IScheduler): Observable<Observable<T>>;
export function windowTime<T>(this: Observable<T>, windowTimeSpan: number,
windowCreationInterval: number,
scheduler?: IScheduler): Observable<Observable<T>>;
export function windowTime<T>(this: Observable<T>, windowTimeSpan: number,
windowCreationInterval: number,
maxWindowSize: number,
scheduler?: IScheduler): Observable<Observable<T>>;

export function windowTime<T>(this: Observable<T>,
windowTimeSpan: number): Observable<Observable<T>> {

let scheduler: IScheduler = async;
let windowCreationInterval: number = null;
let maxWindowSize: number = Number.POSITIVE_INFINITY;

if (isScheduler(arguments[3])) {
scheduler = arguments[3];
}

if (isScheduler(arguments[2])) {
scheduler = arguments[2];
} else if (isNumeric(arguments[2])) {
maxWindowSize = arguments[2];
}

if (isScheduler(arguments[1])) {
scheduler = arguments[1];
} else if (isNumeric(arguments[1])) {
windowCreationInterval = arguments[1];
}

return this.lift(new WindowTimeOperator<T>(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler));
}

class WindowTimeOperator<T> implements Operator<T, Observable<T>> {

constructor(private windowTimeSpan: number,
private windowCreationInterval: number,
private windowCreationInterval: number | null,
private maxWindowSize: number,
private scheduler: IScheduler) {
}

call(subscriber: Subscriber<Observable<T>>, source: any): any {
return source.subscribe(new WindowTimeSubscriber(
subscriber, this.windowTimeSpan, this.windowCreationInterval, this.scheduler
subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler
));
}
}
Expand All @@ -84,7 +128,7 @@ interface CreationState<T> {
}

interface TimeSpanOnlyState<T> {
window: Subject<T>;
window: CountedSubject<T>;
windowTimeSpan: number;
subscriber: WindowTimeSubscriber<T>;
}
Expand All @@ -96,21 +140,35 @@ interface CloseWindowContext<T> {

interface CloseState<T> {
subscriber: WindowTimeSubscriber<T>;
window: Subject<T>;
window: CountedSubject<T>;
context: CloseWindowContext<T>;
}

class CountedSubject<T> extends Subject<T> {
private _numberOfNextedValues: number = 0;

next(value?: T): void {
this._numberOfNextedValues++;
super.next(value);
}

get numberOfNextedValues(): number {
return this._numberOfNextedValues;
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class WindowTimeSubscriber<T> extends Subscriber<T> {
private windows: Array<Subject<T>> = [];
private windows: CountedSubject<T>[] = [];

constructor(protected destination: Subscriber<Observable<T>>,
private windowTimeSpan: number,
private windowCreationInterval: number,
private windowCreationInterval: number | null,
private maxWindowSize: number,
private scheduler: IScheduler) {
super(destination);

Expand All @@ -133,6 +191,9 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
const window = windows[i];
if (!window.closed) {
window.next(value);
if (window.numberOfNextedValues >= this.maxWindowSize) {
this.closeWindow(window);
}
}
}
}
Expand All @@ -156,15 +217,15 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
this.destination.complete();
}

public openWindow(): Subject<T> {
const window = new Subject<T>();
public openWindow(): CountedSubject<T> {
const window = new CountedSubject<T>();
this.windows.push(window);
const destination = this.destination;
destination.next(window);
return window;
}

public closeWindow(window: Subject<T>): void {
public closeWindow(window: CountedSubject<T>): void {
window.complete();
const windows = this.windows;
windows.splice(windows.indexOf(window), 1);
Expand Down

0 comments on commit db8dc77

Please sign in to comment.