From 1b89298c050c44d3765c0e32cc7a8140a3d0da57 Mon Sep 17 00:00:00 2001 From: OJ Kwon <kwon.ohjoong@gmail.com> Date: Sat, 19 Mar 2016 10:24:08 -0700 Subject: [PATCH] fix(bufferToggle): accepts closing selector returns promise relates to #1246 --- spec/operators/bufferToggle-spec.ts | 39 ++++++++++++ src/operator/bufferToggle.ts | 96 ++++++++++++++--------------- 2 files changed, 87 insertions(+), 48 deletions(-) diff --git a/spec/operators/bufferToggle-spec.ts b/spec/operators/bufferToggle-spec.ts index d8cf165cc6c..4d74308e3f2 100644 --- a/spec/operators/bufferToggle-spec.ts +++ b/spec/operators/bufferToggle-spec.ts @@ -1,4 +1,5 @@ import * as Rx from '../../dist/cjs/Rx'; +import {DoneSignature} from '../helpers/test-helper'; declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; const Observable = Rx.Observable; @@ -341,4 +342,42 @@ describe('Observable.prototype.bufferToggle', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should accept closing selector returns promise resolves', (done: DoneSignature) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + const expected = [[1]]; + + e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); })) + .subscribe((x) => { + expect(x).toEqual(expected.shift()); }, + done.fail, + () => { + expect(expected.length).toBe(0); + done(); + }); + }); + + it('should accept closing selector returns promise rejects', (done: DoneSignature) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + + const expected = 42; + + e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); })) + .subscribe((x) => { + done.fail(); + }, (x) => { + expect(x).toBe(expected); + done(); + }, () => { + done.fail(); + }); + }); }); \ No newline at end of file diff --git a/src/operator/bufferToggle.ts b/src/operator/bufferToggle.ts index 55328b736c8..7ea51c32253 100644 --- a/src/operator/bufferToggle.ts +++ b/src/operator/bufferToggle.ts @@ -1,9 +1,11 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; -import {Observable} from '../Observable'; +import {Observable, SubscribableOrPromise} from '../Observable'; import {Subscription} from '../Subscription'; -import {tryCatch} from '../util/tryCatch'; -import {errorObject} from '../util/errorObject'; + +import {subscribeToResult} from '../util/subscribeToResult'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; /** * Buffers values from the source by opening the buffer via signals from an @@ -22,18 +24,18 @@ import {errorObject} from '../util/errorObject'; * @owner Observable */ export function bufferToggle<T, O>(openings: Observable<O>, - closingSelector: (value: O) => Observable<any>): Observable<T[]> { + closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]> { return this.lift(new BufferToggleOperator<T, O>(openings, closingSelector)); } export interface BufferToggleSignature<T> { - <O>(openings: Observable<O>, closingSelector: (value: O) => Observable<any>): Observable<T[]>; + <O>(openings: Observable<O>, closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]>; } class BufferToggleOperator<T, O> implements Operator<T, T[]> { constructor(private openings: Observable<O>, - private closingSelector: (value: O) => Observable<any>) { + private closingSelector: (value: O) => SubscribableOrPromise<any> | void) { } call(subscriber: Subscriber<T[]>): Subscriber<T> { @@ -46,17 +48,17 @@ interface BufferContext<T> { subscription: Subscription; } -class BufferToggleSubscriber<T, O> extends Subscriber<T> { +class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> { private contexts: Array<BufferContext<T>> = []; constructor(destination: Subscriber<T[]>, private openings: Observable<O>, - private closingSelector: (value: O) => Observable<any>) { + private closingSelector: (value: O) => SubscribableOrPromise<any> | void) { super(destination); this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this))); } - protected _next(value: T) { + protected _next(value: T): void { const contexts = this.contexts; const len = contexts.length; for (let i = 0; i < len; i++) { @@ -64,7 +66,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> { } } - protected _error(err: any) { + protected _error(err: any): void { const contexts = this.contexts; while (contexts.length > 0) { const context = contexts.shift(); @@ -76,7 +78,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> { super._error(err); } - protected _complete() { + protected _complete(): void { const contexts = this.contexts; while (contexts.length > 0) { const context = contexts.shift(); @@ -89,27 +91,29 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> { super._complete(); } - openBuffer(value: O) { - const closingSelector = this.closingSelector; - const contexts = this.contexts; - - let closingNotifier = tryCatch(closingSelector)(value); - if (closingNotifier === errorObject) { - this._error(errorObject.e); - } else { - let context = { - buffer: <T[]>[], - subscription: new Subscription() - }; - contexts.push(context); - const subscriber = new BufferToggleClosingsSubscriber<T>(this, context); - const subscription = closingNotifier.subscribe(subscriber); - context.subscription.add(subscription); - this.add(subscription); + openBuffer(value: O): void { + try { + const closingSelector = this.closingSelector; + const closingNotifier = closingSelector.call(this, value); + if (closingNotifier) { + this.trySubscribe(closingNotifier); + } + } catch (err) { + this._error(err); } } - closeBuffer(context: BufferContext<T>) { + notifyNext(outerValue: any, innerValue: O, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber<T, O>): void { + this.closeBuffer(outerValue); + } + + notifyComplete(innerSub: InnerSubscriber<T, O>): void { + this.closeBuffer((<any> innerSub).context); + } + + private closeBuffer(context: BufferContext<T>): void { const contexts = this.contexts; if (contexts === null) { return; @@ -120,34 +124,30 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> { this.remove(subscription); subscription.unsubscribe(); } -} -class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> { - constructor(private parent: BufferToggleSubscriber<T, O>) { - super(null); - } + private trySubscribe(closingNotifier: any): void { + const contexts = this.contexts; - protected _next(value: O) { - this.parent.openBuffer(value); - } + const buffer: Array<T> = []; + const subscription = new Subscription(); + const context = { buffer, subscription }; + contexts.push(context); - protected _error(err: any) { - this.parent.error(err); - } + const innerSubscription = subscribeToResult(this, closingNotifier, <any>context); + (<any> innerSubscription).context = context; - protected _complete() { - // noop + this.add(innerSubscription); + subscription.add(innerSubscription); } } -class BufferToggleClosingsSubscriber<T> extends Subscriber<any> { - constructor(private parent: BufferToggleSubscriber<T, any>, - private context: { subscription: any, buffer: T[] }) { +class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> { + constructor(private parent: BufferToggleSubscriber<T, O>) { super(null); } - protected _next() { - this.parent.closeBuffer(this.context); + protected _next(value: O) { + this.parent.openBuffer(value); } protected _error(err: any) { @@ -155,6 +155,6 @@ class BufferToggleClosingsSubscriber<T> extends Subscriber<any> { } protected _complete() { - this.parent.closeBuffer(this.context); + // noop } }