From d399a6ae388cc42d913c1b3b629bc0a4e5519d05 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Sat, 19 Mar 2016 10:24:08 -0700 Subject: [PATCH 1/3] fix(bufferToggle): accepts closing selector returns promise relates to #1246 --- spec/operators/bufferToggle-spec.ts | 39 +++++++++++ src/operator/bufferToggle.ts | 105 +++++++++++++--------------- 2 files changed, 89 insertions(+), 55 deletions(-) diff --git a/spec/operators/bufferToggle-spec.ts b/spec/operators/bufferToggle-spec.ts index 4a82000915..e1f6edbcfc 100644 --- a/spec/operators/bufferToggle-spec.ts +++ b/spec/operators/bufferToggle-spec.ts @@ -1,5 +1,6 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; +import {DoneSignature} from '../helpers/test-helper'; declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; const Observable = Rx.Observable; @@ -342,4 +343,42 @@ describe('Observable.prototype.bufferToggle', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should accept closing selector that returns a resolved promise', (done: DoneSignature) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + const expected = [[1]]; + + e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); })) + .subscribe((x) => { + expect(x).toEqual(expected.shift()); }, + done.fail, + () => { + expect(expected.length).toBe(0); + done(); + }); + }); + + it('should accept closing selector that returns a rejected promise', (done: DoneSignature) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + + const expected = 42; + + e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); })) + .subscribe((x) => { + done.fail(); + }, (x) => { + expect(x).toBe(expected); + done(); + }, () => { + done.fail(); + }); + }); }); \ No newline at end of file diff --git a/src/operator/bufferToggle.ts b/src/operator/bufferToggle.ts index f1eb6eee72..a211209205 100644 --- a/src/operator/bufferToggle.ts +++ b/src/operator/bufferToggle.ts @@ -1,9 +1,11 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; -import {Observable} from '../Observable'; +import {Observable, SubscribableOrPromise} from '../Observable'; import {Subscription} from '../Subscription'; -import {tryCatch} from '../util/tryCatch'; -import {errorObject} from '../util/errorObject'; + +import {subscribeToResult} from '../util/subscribeToResult'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; /** * Buffers the source Observable values starting from an emission from @@ -17,7 +19,7 @@ import {errorObject} from '../util/errorObject'; * * Buffers values from the source by opening the buffer via signals from an * Observable provided to `openings`, and closing and sending the buffers when - * an Observable returned by the `closingSelector` function emits. + * a Subscribable or Promise returned by the `closingSelector` function emits. * * @example Every other second, emit the click events from the next 500ms * var clicks = Rx.Observable.fromEvent(document, 'click'); @@ -36,7 +38,7 @@ import {errorObject} from '../util/errorObject'; * @param {Observable} openings An observable of notifications to start new * buffers. * @param {function(value: O): Observable} closingSelector A function that takes - * the value emitted by the `openings` observable and returns an Observable, + * the value emitted by the `openings` observable and returns a Subscribable or Promise, * which, when it emits, signals that the associated buffer should be emitted * and cleared. * @return {Observable} An observable of arrays of buffered values. @@ -44,18 +46,18 @@ import {errorObject} from '../util/errorObject'; * @owner Observable */ export function bufferToggle(openings: Observable, - closingSelector: (value: O) => Observable): Observable { + closingSelector: (value: O) => SubscribableOrPromise | void): Observable { return this.lift(new BufferToggleOperator(openings, closingSelector)); } export interface BufferToggleSignature { - (openings: Observable, closingSelector: (value: O) => Observable): Observable; + (openings: Observable, closingSelector: (value: O) => SubscribableOrPromise | void): Observable; } class BufferToggleOperator implements Operator { constructor(private openings: Observable, - private closingSelector: (value: O) => Observable) { + private closingSelector: (value: O) => SubscribableOrPromise | void) { } call(subscriber: Subscriber): Subscriber { @@ -73,17 +75,17 @@ interface BufferContext { * @ignore * @extends {Ignored} */ -class BufferToggleSubscriber extends Subscriber { +class BufferToggleSubscriber extends OuterSubscriber { private contexts: Array> = []; constructor(destination: Subscriber, private openings: Observable, - private closingSelector: (value: O) => Observable) { + private closingSelector: (value: O) => SubscribableOrPromise | void) { super(destination); this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this))); } - protected _next(value: T) { + protected _next(value: T): void { const contexts = this.contexts; const len = contexts.length; for (let i = 0; i < len; i++) { @@ -91,7 +93,7 @@ class BufferToggleSubscriber extends Subscriber { } } - protected _error(err: any) { + protected _error(err: any): void { const contexts = this.contexts; while (contexts.length > 0) { const context = contexts.shift(); @@ -103,7 +105,7 @@ class BufferToggleSubscriber extends Subscriber { super._error(err); } - protected _complete() { + protected _complete(): void { const contexts = this.contexts; while (contexts.length > 0) { const context = contexts.shift(); @@ -116,27 +118,29 @@ class BufferToggleSubscriber extends Subscriber { super._complete(); } - openBuffer(value: O) { - const closingSelector = this.closingSelector; - const contexts = this.contexts; - - let closingNotifier = tryCatch(closingSelector)(value); - if (closingNotifier === errorObject) { - this._error(errorObject.e); - } else { - let context = { - buffer: [], - subscription: new Subscription() - }; - contexts.push(context); - const subscriber = new BufferToggleClosingsSubscriber(this, context); - const subscription = closingNotifier.subscribe(subscriber); - context.subscription.add(subscription); - this.add(subscription); + openBuffer(value: O): void { + try { + const closingSelector = this.closingSelector; + const closingNotifier = closingSelector.call(this, value); + if (closingNotifier) { + this.trySubscribe(closingNotifier); + } + } catch (err) { + this._error(err); } } - closeBuffer(context: BufferContext) { + notifyNext(outerValue: any, innerValue: O, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.closeBuffer(outerValue); + } + + notifyComplete(innerSub: InnerSubscriber): void { + this.closeBuffer(( innerSub).context); + } + + private closeBuffer(context: BufferContext): void { const contexts = this.contexts; if (contexts === null) { return; @@ -147,28 +151,20 @@ class BufferToggleSubscriber extends Subscriber { this.remove(subscription); subscription.unsubscribe(); } -} -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class BufferToggleOpeningsSubscriber extends Subscriber { - constructor(private parent: BufferToggleSubscriber) { - super(null); - } + private trySubscribe(closingNotifier: any): void { + const contexts = this.contexts; - protected _next(value: O) { - this.parent.openBuffer(value); - } + const buffer: Array = []; + const subscription = new Subscription(); + const context = { buffer, subscription }; + contexts.push(context); - protected _error(err: any) { - this.parent.error(err); - } + const innerSubscription = subscribeToResult(this, closingNotifier, context); + ( innerSubscription).context = context; - protected _complete() { - // noop + this.add(innerSubscription); + subscription.add(innerSubscription); } } @@ -177,14 +173,13 @@ class BufferToggleOpeningsSubscriber extends Subscriber { * @ignore * @extends {Ignored} */ -class BufferToggleClosingsSubscriber extends Subscriber { - constructor(private parent: BufferToggleSubscriber, - private context: { subscription: any, buffer: T[] }) { +class BufferToggleOpeningsSubscriber extends Subscriber { + constructor(private parent: BufferToggleSubscriber) { super(null); } - protected _next() { - this.parent.closeBuffer(this.context); + protected _next(value: O) { + this.parent.openBuffer(value); } protected _error(err: any) { @@ -192,6 +187,6 @@ class BufferToggleClosingsSubscriber extends Subscriber { } protected _complete() { - this.parent.closeBuffer(this.context); + // noop } } From a40c4960f0411c5631d3b0c8802c6fe3510d7363 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Sat, 19 Mar 2016 10:51:29 -0700 Subject: [PATCH 2/3] fix(bufferToggle): handle closingSelector completes immediately relates to #1487 --- spec/operators/bufferToggle-spec.ts | 32 ++++++++++++++++++++--------- src/operator/bufferToggle.ts | 25 +++++++++++++--------- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/spec/operators/bufferToggle-spec.ts b/spec/operators/bufferToggle-spec.ts index e1f6edbcfc..db44c54c2c 100644 --- a/spec/operators/bufferToggle-spec.ts +++ b/spec/operators/bufferToggle-spec.ts @@ -1,6 +1,5 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; -import {DoneSignature} from '../helpers/test-helper'; declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; const Observable = Rx.Observable; @@ -344,7 +343,7 @@ describe('Observable.prototype.bufferToggle', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); - it('should accept closing selector that returns a resolved promise', (done: DoneSignature) => { + it('should accept closing selector that returns a resolved promise', (done: MochaDone) => { const e1 = Observable.concat(Observable.of(1), Observable.timer(10).mapTo(2), Observable.timer(10).mapTo(3), @@ -354,15 +353,16 @@ describe('Observable.prototype.bufferToggle', () => { e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); })) .subscribe((x) => { - expect(x).toEqual(expected.shift()); }, - done.fail, - () => { - expect(expected.length).toBe(0); + expect(x).to.deep.equal(expected.shift()); + }, () => { + done(new Error('should not be called')); + }, () => { + expect(expected.length).to.be.equal(0); done(); }); }); - it('should accept closing selector that returns a rejected promise', (done: DoneSignature) => { + it('should accept closing selector that returns a rejected promise', (done: MochaDone) => { const e1 = Observable.concat(Observable.of(1), Observable.timer(10).mapTo(2), Observable.timer(10).mapTo(3), @@ -373,12 +373,24 @@ describe('Observable.prototype.bufferToggle', () => { e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); })) .subscribe((x) => { - done.fail(); + done(new Error('should not be called')); }, (x) => { - expect(x).toBe(expected); + expect(x).to.equal(expected); done(); }, () => { - done.fail(); + done(new Error('should not be called')); }); }); + + it('should handle empty closing observable', () => { + const e1 = hot('--a--^---b---c---d---e---f---g---h------|'); + const subs = '^ !'; + const e2 = cold('--x-----------y--------z---| '); + const expected = '--l-----------m--------n-----------|'; + + const result = e1.bufferToggle(e2, () => Observable.empty()); + + expectObservable(result).toBe(expected, {l: [], m: [], n: []}); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); \ No newline at end of file diff --git a/src/operator/bufferToggle.ts b/src/operator/bufferToggle.ts index a211209205..857116ba57 100644 --- a/src/operator/bufferToggle.ts +++ b/src/operator/bufferToggle.ts @@ -142,14 +142,14 @@ class BufferToggleSubscriber extends OuterSubscriber { private closeBuffer(context: BufferContext): void { const contexts = this.contexts; - if (contexts === null) { - return; + + if (contexts && context) { + const { buffer, subscription } = context; + this.destination.next(buffer); + contexts.splice(contexts.indexOf(context), 1); + this.remove(subscription); + subscription.unsubscribe(); } - const { buffer, subscription } = context; - this.destination.next(buffer); - contexts.splice(contexts.indexOf(context), 1); - this.remove(subscription); - subscription.unsubscribe(); } private trySubscribe(closingNotifier: any): void { @@ -161,10 +161,15 @@ class BufferToggleSubscriber extends OuterSubscriber { contexts.push(context); const innerSubscription = subscribeToResult(this, closingNotifier, context); - ( innerSubscription).context = context; - this.add(innerSubscription); - subscription.add(innerSubscription); + if (!innerSubscription.isUnsubscribed) { + ( innerSubscription).context = context; + + this.add(innerSubscription); + subscription.add(innerSubscription); + } else { + this.closeBuffer(context); + } } } From e64c6bfcd3cfc824bff235382e9cecb09ea21cbe Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Fri, 1 Apr 2016 11:25:34 -0700 Subject: [PATCH 3/3] fix(bufferToggle): accepts promise as openings --- spec/operators/bufferToggle-spec.ts | 46 ++++++++++++++++++- src/operator/bufferToggle.ts | 71 ++++++++++------------------- 2 files changed, 68 insertions(+), 49 deletions(-) diff --git a/spec/operators/bufferToggle-spec.ts b/spec/operators/bufferToggle-spec.ts index db44c54c2c..0d78a28242 100644 --- a/spec/operators/bufferToggle-spec.ts +++ b/spec/operators/bufferToggle-spec.ts @@ -343,6 +343,48 @@ describe('Observable.prototype.bufferToggle', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + it('should accept openings resolved promise', (done: MochaDone) => { + const e1 = Observable.concat( + Observable.timer(10).mapTo(1), + Observable.timer(100).mapTo(2), + Observable.timer(150).mapTo(3), + Observable.timer(200).mapTo(4)); + + const expected = [[1]]; + + e1.bufferToggle(new Promise((resolve: any) => { resolve(42); }), () => { + return Observable.timer(50); + }).subscribe((x) => { + expect(x).to.deep.equal(expected.shift()); + }, (x) => { + done(new Error('should not be called')); + }, () => { + expect(expected.length).to.be.equal(0); + done(); + }); + }); + + it('should accept openings rejected promise', (done: MochaDone) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + + const expected = 42; + + e1.bufferToggle(new Promise((resolve: any, reject: any) => { reject(expected); }), () => { + return Observable.timer(50); + }).subscribe((x) => { + done(new Error('should not be called')); + }, (x) => { + expect(x).to.equal(expected); + done(); + }, () => { + done(new Error('should not be called')); + }); + }); + it('should accept closing selector that returns a resolved promise', (done: MochaDone) => { const e1 = Observable.concat(Observable.of(1), Observable.timer(10).mapTo(2), @@ -357,8 +399,8 @@ describe('Observable.prototype.bufferToggle', () => { }, () => { done(new Error('should not be called')); }, () => { - expect(expected.length).to.be.equal(0); - done(); + expect(expected.length).to.be.equal(0); + done(); }); }); diff --git a/src/operator/bufferToggle.ts b/src/operator/bufferToggle.ts index 857116ba57..19405cd668 100644 --- a/src/operator/bufferToggle.ts +++ b/src/operator/bufferToggle.ts @@ -35,9 +35,9 @@ import {InnerSubscriber} from '../InnerSubscriber'; * @see {@link bufferWhen} * @see {@link windowToggle} * - * @param {Observable} openings An observable of notifications to start new + * @param {SubscribableOrPromise} openings A Subscribable or Promise of notifications to start new * buffers. - * @param {function(value: O): Observable} closingSelector A function that takes + * @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes * the value emitted by the `openings` observable and returns a Subscribable or Promise, * which, when it emits, signals that the associated buffer should be emitted * and cleared. @@ -45,19 +45,19 @@ import {InnerSubscriber} from '../InnerSubscriber'; * @method bufferToggle * @owner Observable */ -export function bufferToggle(openings: Observable, - closingSelector: (value: O) => SubscribableOrPromise | void): Observable { +export function bufferToggle(openings: SubscribableOrPromise, + closingSelector: (value: O) => SubscribableOrPromise): Observable { return this.lift(new BufferToggleOperator(openings, closingSelector)); } export interface BufferToggleSignature { - (openings: Observable, closingSelector: (value: O) => SubscribableOrPromise | void): Observable; + (openings: SubscribableOrPromise, closingSelector: (value: O) => SubscribableOrPromise): Observable; } class BufferToggleOperator implements Operator { - constructor(private openings: Observable, - private closingSelector: (value: O) => SubscribableOrPromise | void) { + constructor(private openings: SubscribableOrPromise, + private closingSelector: (value: O) => SubscribableOrPromise) { } call(subscriber: Subscriber): Subscriber { @@ -79,10 +79,10 @@ class BufferToggleSubscriber extends OuterSubscriber { private contexts: Array> = []; constructor(destination: Subscriber, - private openings: Observable, + private openings: SubscribableOrPromise, private closingSelector: (value: O) => SubscribableOrPromise | void) { super(destination); - this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this))); + this.add(subscribeToResult(this, openings)); } protected _next(value: T): void { @@ -118,7 +118,17 @@ class BufferToggleSubscriber extends OuterSubscriber { super._complete(); } - openBuffer(value: O): void { + notifyNext(outerValue: any, innerValue: O, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue); + } + + notifyComplete(innerSub: InnerSubscriber): void { + this.closeBuffer(( innerSub).context); + } + + private openBuffer(value: O): void { try { const closingSelector = this.closingSelector; const closingNotifier = closingSelector.call(this, value); @@ -130,16 +140,6 @@ class BufferToggleSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: any, innerValue: O, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.closeBuffer(outerValue); - } - - notifyComplete(innerSub: InnerSubscriber): void { - this.closeBuffer(( innerSub).context); - } - private closeBuffer(context: BufferContext): void { const contexts = this.contexts; @@ -162,36 +162,13 @@ class BufferToggleSubscriber extends OuterSubscriber { const innerSubscription = subscribeToResult(this, closingNotifier, context); - if (!innerSubscription.isUnsubscribed) { + if (!innerSubscription || innerSubscription.isUnsubscribed) { + this.closeBuffer(context); + } else { ( innerSubscription).context = context; this.add(innerSubscription); subscription.add(innerSubscription); - } else { - this.closeBuffer(context); } } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class BufferToggleOpeningsSubscriber extends Subscriber { - constructor(private parent: BufferToggleSubscriber) { - super(null); - } - - protected _next(value: O) { - this.parent.openBuffer(value); - } - - protected _error(err: any) { - this.parent.error(err); - } - - protected _complete() { - // noop - } -} +} \ No newline at end of file