Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(windowTime): clarify type definition of windowTime #2277

Merged
merged 1 commit into from
Jan 27, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions src/operator/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,30 @@ interface CreationState<T> {
scheduler: IScheduler;
}

interface TimeSpanOnlyState<T> {
window: Subject<T>;
windowTimeSpan: number;
subscriber: WindowTimeSubscriber<T>;
}

interface CloseWindowContext<T> {
action: Action<CreationState<T>>;
subscription: Subscription;
}

interface CloseState<T> {
subscriber: WindowTimeSubscriber<T>;
window: Subject<T>;
context: CloseWindowContext<T>;
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class WindowTimeSubscriber<T> extends Subscriber<T> {
private windows: Subject<T>[] = [];
private windows: Array<Subject<T>> = [];

constructor(protected destination: Subscriber<Observable<T>>,
private windowTimeSpan: number,
Expand All @@ -98,18 +115,18 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
super(destination);
if (windowCreationInterval !== null && windowCreationInterval >= 0) {
let window = this.openWindow();
const closeState = { subscriber: this, window, context: <any>null };
const closeState: CloseState<T> = { subscriber: this, window, context: <any>null };
const creationState: CreationState<T> = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler };
this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, closeState));
this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, creationState));
} else {
let window = this.openWindow();
const timeSpanOnlyState = { subscriber: this, window, windowTimeSpan };
const timeSpanOnlyState: TimeSpanOnlyState<T> = { subscriber: this, window, windowTimeSpan };
this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState));
}
}

protected _next(value: T) {
protected _next(value: T): void {
const windows = this.windows;
const len = windows.length;
for (let i = 0; i < len; i++) {
Expand All @@ -120,15 +137,15 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
}
}

protected _error(err: any) {
protected _error(err: any): void {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().error(err);
}
this.destination.error(err);
}

protected _complete() {
protected _complete(): void {
const windows = this.windows;
while (windows.length > 0) {
const window = windows.shift();
Expand All @@ -139,28 +156,22 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
this.destination.complete();
}

openWindow(): Subject<T> {
public openWindow(): Subject<T> {
const window = new Subject<T>();
this.windows.push(window);
const destination = this.destination;
destination.next(window);
return window;
}

closeWindow(window: Subject<T>) {
public closeWindow(window: Subject<T>): void {
window.complete();
const windows = this.windows;
windows.splice(windows.indexOf(window), 1);
}
}

interface TimeSpanOnlyState<T> {
window: Subject<any>;
windowTimeSpan: number;
subscriber: WindowTimeSubscriber<T>;
}

function dispatchWindowTimeSpanOnly<T>(this: Action<TimeSpanOnlyState<T>>, state: TimeSpanOnlyState<T>) {
function dispatchWindowTimeSpanOnly<T>(this: Action<TimeSpanOnlyState<T>>, state: TimeSpanOnlyState<T>): void {
const { subscriber, windowTimeSpan, window } = state;
if (window) {
window.complete();
Expand All @@ -169,30 +180,19 @@ function dispatchWindowTimeSpanOnly<T>(this: Action<TimeSpanOnlyState<T>>, state
this.schedule(state, windowTimeSpan);
}

interface Context<T> {
action: Action<CreationState<T>>;
subscription: Subscription;
}

interface DispatchArg<T> {
subscriber: WindowTimeSubscriber<T>;
window: Subject<T>;
context: Context<T>;
}

function dispatchWindowCreation<T>(this: Action<CreationState<T>>, state: CreationState<T>) {
let { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
let window = subscriber.openWindow();
let action = this;
let context: Context<T> = { action, subscription: <any>null };
const timeSpanState: DispatchArg<T> = { subscriber, window, context };
function dispatchWindowCreation<T>(this: Action<CreationState<T>>, state: CreationState<T>): void {
const { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
const window = subscriber.openWindow();
const action = this;
let context: CloseWindowContext<T> = { action, subscription: <any>null };
const timeSpanState: CloseState<T> = { subscriber, window, context };
context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, timeSpanState);
action.add(context.subscription);
action.schedule(state, windowCreationInterval);
}

function dispatchWindowClose<T>(arg: DispatchArg<T>) {
const { subscriber, window, context } = arg;
function dispatchWindowClose<T>(state: CloseState<T>): void {
const { subscriber, window, context } = state;
if (context && context.action && context.subscription) {
context.action.remove(context.subscription);
}
Expand Down