Skip to content

Commit

Permalink
refactor(window): use subscribeToResult with outerSubscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Feb 4, 2016
1 parent 96fc860 commit 2878aed
Showing 1 changed file with 24 additions and 32 deletions.
56 changes: 24 additions & 32 deletions src/operator/window.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Subject} from '../Subject';

import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

export function window<T>(closingNotifier: Observable<any>): Observable<Observable<T>> {
return this.lift(new WindowOperator(closingNotifier));
}
Expand All @@ -17,31 +21,45 @@ class WindowOperator<T> implements Operator<T, Observable<T>> {
}
}

class WindowSubscriber<T> extends Subscriber<T> {
class WindowSubscriber<T, R> extends OuterSubscriber<T, R> {
private window: Subject<T>;

constructor(protected destination: Subscriber<Observable<T>>,
private closingNotifier: Observable<any>) {
super(destination);
this.add(closingNotifier.subscribe(new WindowClosingNotifierSubscriber(this)));
this.add(subscribeToResult(this, closingNotifier));
this.openWindow();
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.openWindow();
}

protected _next(value: T) {
notifyError(error: any, innerSub: InnerSubscriber<T, R>): void {
this._error(error);
}

notifyComplete(innerSub: InnerSubscriber<T, R>): void {
this._complete();
}

protected _next(value: T): void {
this.window.next(value);
}

protected _error(err: any) {
protected _error(err: any): void {
this.window.error(err);
this.destination.error(err);
}

protected _complete() {
protected _complete(): void {
this.window.complete();
this.destination.complete();
}

openWindow() {
private openWindow(): void {
const prevWindow = this.window;
if (prevWindow) {
prevWindow.complete();
Expand All @@ -51,30 +69,4 @@ class WindowSubscriber<T> extends Subscriber<T> {
destination.add(newWindow);
destination.next(newWindow);
}

errorWindow(err: any) {
this._error(err);
}

completeWindow() {
this._complete();
}
}

class WindowClosingNotifierSubscriber extends Subscriber<any> {
constructor(private parent: WindowSubscriber<any>) {
super();
}

protected _next() {
this.parent.openWindow();
}

protected _error(err: any) {
this.parent.errorWindow(err);
}

protected _complete() {
this.parent.completeWindow();
}
}

0 comments on commit 2878aed

Please sign in to comment.