diff --git a/spec/operators/bufferToggle-spec.ts b/spec/operators/bufferToggle-spec.ts index 4a82000915..0d78a28242 100644 --- a/spec/operators/bufferToggle-spec.ts +++ b/spec/operators/bufferToggle-spec.ts @@ -342,4 +342,97 @@ describe('Observable.prototype.bufferToggle', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should accept openings resolved promise', (done: MochaDone) => { + const e1 = Observable.concat( + Observable.timer(10).mapTo(1), + Observable.timer(100).mapTo(2), + Observable.timer(150).mapTo(3), + Observable.timer(200).mapTo(4)); + + const expected = [[1]]; + + e1.bufferToggle(new Promise((resolve: any) => { resolve(42); }), () => { + return Observable.timer(50); + }).subscribe((x) => { + expect(x).to.deep.equal(expected.shift()); + }, (x) => { + done(new Error('should not be called')); + }, () => { + expect(expected.length).to.be.equal(0); + done(); + }); + }); + + it('should accept openings rejected promise', (done: MochaDone) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + + const expected = 42; + + e1.bufferToggle(new Promise((resolve: any, reject: any) => { reject(expected); }), () => { + return Observable.timer(50); + }).subscribe((x) => { + done(new Error('should not be called')); + }, (x) => { + expect(x).to.equal(expected); + done(); + }, () => { + done(new Error('should not be called')); + }); + }); + + it('should accept closing selector that returns a resolved promise', (done: MochaDone) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + const expected = [[1]]; + + e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); })) + .subscribe((x) => { + expect(x).to.deep.equal(expected.shift()); + }, () => { + done(new Error('should not be called')); + }, () => { + expect(expected.length).to.be.equal(0); + done(); + }); + }); + + it('should accept closing selector that returns a rejected promise', (done: MochaDone) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + + const expected = 42; + + e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); })) + .subscribe((x) => { + done(new Error('should not be called')); + }, (x) => { + expect(x).to.equal(expected); + done(); + }, () => { + done(new Error('should not be called')); + }); + }); + + it('should handle empty closing observable', () => { + const e1 = hot('--a--^---b---c---d---e---f---g---h------|'); + const subs = '^ !'; + const e2 = cold('--x-----------y--------z---| '); + const expected = '--l-----------m--------n-----------|'; + + const result = e1.bufferToggle(e2, () => Observable.empty()); + + expectObservable(result).toBe(expected, {l: [], m: [], n: []}); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); \ No newline at end of file diff --git a/src/operator/bufferToggle.ts b/src/operator/bufferToggle.ts index f1eb6eee72..19405cd668 100644 --- a/src/operator/bufferToggle.ts +++ b/src/operator/bufferToggle.ts @@ -1,9 +1,11 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; -import {Observable} from '../Observable'; +import {Observable, SubscribableOrPromise} from '../Observable'; import {Subscription} from '../Subscription'; -import {tryCatch} from '../util/tryCatch'; -import {errorObject} from '../util/errorObject'; + +import {subscribeToResult} from '../util/subscribeToResult'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; /** * Buffers the source Observable values starting from an emission from @@ -17,7 +19,7 @@ import {errorObject} from '../util/errorObject'; * * Buffers values from the source by opening the buffer via signals from an * Observable provided to `openings`, and closing and sending the buffers when - * an Observable returned by the `closingSelector` function emits. + * a Subscribable or Promise returned by the `closingSelector` function emits. * * @example Every other second, emit the click events from the next 500ms * var clicks = Rx.Observable.fromEvent(document, 'click'); @@ -33,29 +35,29 @@ import {errorObject} from '../util/errorObject'; * @see {@link bufferWhen} * @see {@link windowToggle} * - * @param {Observable} openings An observable of notifications to start new + * @param {SubscribableOrPromise} openings A Subscribable or Promise of notifications to start new * buffers. - * @param {function(value: O): Observable} closingSelector A function that takes - * the value emitted by the `openings` observable and returns an Observable, + * @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes + * the value emitted by the `openings` observable and returns a Subscribable or Promise, * which, when it emits, signals that the associated buffer should be emitted * and cleared. * @return {Observable} An observable of arrays of buffered values. * @method bufferToggle * @owner Observable */ -export function bufferToggle(openings: Observable, - closingSelector: (value: O) => Observable): Observable { +export function bufferToggle(openings: SubscribableOrPromise, + closingSelector: (value: O) => SubscribableOrPromise): Observable { return this.lift(new BufferToggleOperator(openings, closingSelector)); } export interface BufferToggleSignature { - (openings: Observable, closingSelector: (value: O) => Observable): Observable; + (openings: SubscribableOrPromise, closingSelector: (value: O) => SubscribableOrPromise): Observable; } class BufferToggleOperator implements Operator { - constructor(private openings: Observable, - private closingSelector: (value: O) => Observable) { + constructor(private openings: SubscribableOrPromise, + private closingSelector: (value: O) => SubscribableOrPromise) { } call(subscriber: Subscriber): Subscriber { @@ -73,17 +75,17 @@ interface BufferContext { * @ignore * @extends {Ignored} */ -class BufferToggleSubscriber extends Subscriber { +class BufferToggleSubscriber extends OuterSubscriber { private contexts: Array> = []; constructor(destination: Subscriber, - private openings: Observable, - private closingSelector: (value: O) => Observable) { + private openings: SubscribableOrPromise, + private closingSelector: (value: O) => SubscribableOrPromise | void) { super(destination); - this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this))); + this.add(subscribeToResult(this, openings)); } - protected _next(value: T) { + protected _next(value: T): void { const contexts = this.contexts; const len = contexts.length; for (let i = 0; i < len; i++) { @@ -91,7 +93,7 @@ class BufferToggleSubscriber extends Subscriber { } } - protected _error(err: any) { + protected _error(err: any): void { const contexts = this.contexts; while (contexts.length > 0) { const context = contexts.shift(); @@ -103,7 +105,7 @@ class BufferToggleSubscriber extends Subscriber { super._error(err); } - protected _complete() { + protected _complete(): void { const contexts = this.contexts; while (contexts.length > 0) { const context = contexts.shift(); @@ -116,82 +118,57 @@ class BufferToggleSubscriber extends Subscriber { super._complete(); } - openBuffer(value: O) { - const closingSelector = this.closingSelector; - const contexts = this.contexts; - - let closingNotifier = tryCatch(closingSelector)(value); - if (closingNotifier === errorObject) { - this._error(errorObject.e); - } else { - let context = { - buffer: [], - subscription: new Subscription() - }; - contexts.push(context); - const subscriber = new BufferToggleClosingsSubscriber(this, context); - const subscription = closingNotifier.subscribe(subscriber); - context.subscription.add(subscription); - this.add(subscription); - } + notifyNext(outerValue: any, innerValue: O, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue); } - closeBuffer(context: BufferContext) { - const contexts = this.contexts; - if (contexts === null) { - return; - } - const { buffer, subscription } = context; - this.destination.next(buffer); - contexts.splice(contexts.indexOf(context), 1); - this.remove(subscription); - subscription.unsubscribe(); + notifyComplete(innerSub: InnerSubscriber): void { + this.closeBuffer(( innerSub).context); } -} -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class BufferToggleOpeningsSubscriber extends Subscriber { - constructor(private parent: BufferToggleSubscriber) { - super(null); + private openBuffer(value: O): void { + try { + const closingSelector = this.closingSelector; + const closingNotifier = closingSelector.call(this, value); + if (closingNotifier) { + this.trySubscribe(closingNotifier); + } + } catch (err) { + this._error(err); + } } - protected _next(value: O) { - this.parent.openBuffer(value); - } + private closeBuffer(context: BufferContext): void { + const contexts = this.contexts; - protected _error(err: any) { - this.parent.error(err); + if (contexts && context) { + const { buffer, subscription } = context; + this.destination.next(buffer); + contexts.splice(contexts.indexOf(context), 1); + this.remove(subscription); + subscription.unsubscribe(); + } } - protected _complete() { - // noop - } -} + private trySubscribe(closingNotifier: any): void { + const contexts = this.contexts; -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class BufferToggleClosingsSubscriber extends Subscriber { - constructor(private parent: BufferToggleSubscriber, - private context: { subscription: any, buffer: T[] }) { - super(null); - } + const buffer: Array = []; + const subscription = new Subscription(); + const context = { buffer, subscription }; + contexts.push(context); - protected _next() { - this.parent.closeBuffer(this.context); - } + const innerSubscription = subscribeToResult(this, closingNotifier, context); - protected _error(err: any) { - this.parent.error(err); - } + if (!innerSubscription || innerSubscription.isUnsubscribed) { + this.closeBuffer(context); + } else { + ( innerSubscription).context = context; - protected _complete() { - this.parent.closeBuffer(this.context); + this.add(innerSubscription); + subscription.add(innerSubscription); + } } -} +} \ No newline at end of file