Skip to content

Commit

Permalink
fix(bufferToggle): accepts closing selector returns promise
Browse files Browse the repository at this point in the history
relates to ReactiveX#1246
  • Loading branch information
kwonoj committed Apr 1, 2016
1 parent 8da7a8f commit d399a6a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 55 deletions.
39 changes: 39 additions & 0 deletions spec/operators/bufferToggle-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import {DoneSignature} from '../helpers/test-helper';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Observable = Rx.Observable;
Expand Down Expand Up @@ -342,4 +343,42 @@ describe('Observable.prototype.bufferToggle', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should accept closing selector that returns a resolved promise', (done: DoneSignature) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Observable.timer(100).mapTo(4)
);
const expected = [[1]];

e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); }))
.subscribe((x) => {
expect(x).toEqual(expected.shift()); },
done.fail,
() => {
expect(expected.length).toBe(0);
done();
});
});

it('should accept closing selector that returns a rejected promise', (done: DoneSignature) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Observable.timer(100).mapTo(4)
);

const expected = 42;

e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); }))
.subscribe((x) => {
done.fail();
}, (x) => {
expect(x).toBe(expected);
done();
}, () => {
done.fail();
});
});
});
105 changes: 50 additions & 55 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';

/**
* Buffers the source Observable values starting from an emission from
Expand All @@ -17,7 +19,7 @@ import {errorObject} from '../util/errorObject';
*
* Buffers values from the source by opening the buffer via signals from an
* Observable provided to `openings`, and closing and sending the buffers when
* an Observable returned by the `closingSelector` function emits.
* a Subscribable or Promise returned by the `closingSelector` function emits.
*
* @example <caption>Every other second, emit the click events from the next 500ms</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
Expand All @@ -36,26 +38,26 @@ import {errorObject} from '../util/errorObject';
* @param {Observable<O>} openings An observable of notifications to start new
* buffers.
* @param {function(value: O): Observable} closingSelector A function that takes
* the value emitted by the `openings` observable and returns an Observable,
* the value emitted by the `openings` observable and returns a Subscribable or Promise,
* which, when it emits, signals that the associated buffer should be emitted
* and cleared.
* @return {Observable<T[]>} An observable of arrays of buffered values.
* @method bufferToggle
* @owner Observable
*/
export function bufferToggle<T, O>(openings: Observable<O>,
closingSelector: (value: O) => Observable<any>): Observable<T[]> {
closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]> {
return this.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
}

export interface BufferToggleSignature<T> {
<O>(openings: Observable<O>, closingSelector: (value: O) => Observable<any>): Observable<T[]>;
<O>(openings: Observable<O>, closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]>;
}

class BufferToggleOperator<T, O> implements Operator<T, T[]> {

constructor(private openings: Observable<O>,
private closingSelector: (value: O) => Observable<any>) {
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
}

call(subscriber: Subscriber<T[]>): Subscriber<T> {
Expand All @@ -73,25 +75,25 @@ interface BufferContext<T> {
* @ignore
* @extends {Ignored}
*/
class BufferToggleSubscriber<T, O> extends Subscriber<T> {
class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
private contexts: Array<BufferContext<T>> = [];

constructor(destination: Subscriber<T[]>,
private openings: Observable<O>,
private closingSelector: (value: O) => Observable<any>) {
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
super(destination);
this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this)));
}

protected _next(value: T) {
protected _next(value: T): void {
const contexts = this.contexts;
const len = contexts.length;
for (let i = 0; i < len; i++) {
contexts[i].buffer.push(value);
}
}

protected _error(err: any) {
protected _error(err: any): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
Expand All @@ -103,7 +105,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
super._error(err);
}

protected _complete() {
protected _complete(): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
Expand All @@ -116,27 +118,29 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
super._complete();
}

openBuffer(value: O) {
const closingSelector = this.closingSelector;
const contexts = this.contexts;

let closingNotifier = tryCatch(closingSelector)(value);
if (closingNotifier === errorObject) {
this._error(errorObject.e);
} else {
let context = {
buffer: <T[]>[],
subscription: new Subscription()
};
contexts.push(context);
const subscriber = new BufferToggleClosingsSubscriber<T>(this, context);
const subscription = closingNotifier.subscribe(subscriber);
context.subscription.add(subscription);
this.add(subscription);
openBuffer(value: O): void {
try {
const closingSelector = this.closingSelector;
const closingNotifier = closingSelector.call(this, value);
if (closingNotifier) {
this.trySubscribe(closingNotifier);
}
} catch (err) {
this._error(err);
}
}

closeBuffer(context: BufferContext<T>) {
notifyNext(outerValue: any, innerValue: O,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, O>): void {
this.closeBuffer(outerValue);
}

notifyComplete(innerSub: InnerSubscriber<T, O>): void {
this.closeBuffer((<any> innerSub).context);
}

private closeBuffer(context: BufferContext<T>): void {
const contexts = this.contexts;
if (contexts === null) {
return;
Expand All @@ -147,28 +151,20 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
this.remove(subscription);
subscription.unsubscribe();
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> {
constructor(private parent: BufferToggleSubscriber<T, O>) {
super(null);
}
private trySubscribe(closingNotifier: any): void {
const contexts = this.contexts;

protected _next(value: O) {
this.parent.openBuffer(value);
}
const buffer: Array<T> = [];
const subscription = new Subscription();
const context = { buffer, subscription };
contexts.push(context);

protected _error(err: any) {
this.parent.error(err);
}
const innerSubscription = subscribeToResult(this, closingNotifier, <any>context);
(<any> innerSubscription).context = context;

protected _complete() {
// noop
this.add(innerSubscription);
subscription.add(innerSubscription);
}
}

Expand All @@ -177,21 +173,20 @@ class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> {
* @ignore
* @extends {Ignored}
*/
class BufferToggleClosingsSubscriber<T> extends Subscriber<any> {
constructor(private parent: BufferToggleSubscriber<T, any>,
private context: { subscription: any, buffer: T[] }) {
class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> {
constructor(private parent: BufferToggleSubscriber<T, O>) {
super(null);
}

protected _next() {
this.parent.closeBuffer(this.context);
protected _next(value: O) {
this.parent.openBuffer(value);
}

protected _error(err: any) {
this.parent.error(err);
}

protected _complete() {
this.parent.closeBuffer(this.context);
// noop
}
}

0 comments on commit d399a6a

Please sign in to comment.