Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update bufferToggle closingSelector to accept promise / empty #1494

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions spec/operators/bufferToggle-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,97 @@ describe('Observable.prototype.bufferToggle', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should accept openings resolved promise', (done: MochaDone) => {
const e1 = Observable.concat(
Observable.timer(10).mapTo(1),
Observable.timer(100).mapTo(2),
Observable.timer(150).mapTo(3),
Observable.timer(200).mapTo(4));

const expected = [[1]];

e1.bufferToggle(new Promise((resolve: any) => { resolve(42); }), () => {
return Observable.timer(50);
}).subscribe((x) => {
expect(x).to.deep.equal(expected.shift());
}, (x) => {
done(new Error('should not be called'));
}, () => {
expect(expected.length).to.be.equal(0);
done();
});
});

it('should accept openings rejected promise', (done: MochaDone) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Observable.timer(100).mapTo(4)
);

const expected = 42;

e1.bufferToggle(new Promise((resolve: any, reject: any) => { reject(expected); }), () => {
return Observable.timer(50);
}).subscribe((x) => {
done(new Error('should not be called'));
}, (x) => {
expect(x).to.equal(expected);
done();
}, () => {
done(new Error('should not be called'));
});
});

it('should accept closing selector that returns a resolved promise', (done: MochaDone) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Observable.timer(100).mapTo(4)
);
const expected = [[1]];

e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); }))
.subscribe((x) => {
expect(x).to.deep.equal(expected.shift());
}, () => {
done(new Error('should not be called'));
}, () => {
expect(expected.length).to.be.equal(0);
done();
});
});

it('should accept closing selector that returns a rejected promise', (done: MochaDone) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Observable.timer(100).mapTo(4)
);

const expected = 42;

e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); }))
.subscribe((x) => {
done(new Error('should not be called'));
}, (x) => {
expect(x).to.equal(expected);
done();
}, () => {
done(new Error('should not be called'));
});
});

it('should handle empty closing observable', () => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------|');
const subs = '^ !';
const e2 = cold('--x-----------y--------z---| ');
const expected = '--l-----------m--------n-----------|';

const result = e1.bufferToggle(e2, () => Observable.empty());

expectObservable(result).toBe(expected, {l: [], m: [], n: []});
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});
145 changes: 61 additions & 84 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';

/**
* Buffers the source Observable values starting from an emission from
Expand All @@ -17,7 +19,7 @@ import {errorObject} from '../util/errorObject';
*
* Buffers values from the source by opening the buffer via signals from an
* Observable provided to `openings`, and closing and sending the buffers when
* an Observable returned by the `closingSelector` function emits.
* a Subscribable or Promise returned by the `closingSelector` function emits.
*
* @example <caption>Every other second, emit the click events from the next 500ms</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
Expand All @@ -33,29 +35,29 @@ import {errorObject} from '../util/errorObject';
* @see {@link bufferWhen}
* @see {@link windowToggle}
*
* @param {Observable<O>} openings An observable of notifications to start new
* @param {SubscribableOrPromise<O>} openings A Subscribable or Promise of notifications to start new
* buffers.
* @param {function(value: O): Observable} closingSelector A function that takes
* the value emitted by the `openings` observable and returns an Observable,
* @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes
* the value emitted by the `openings` observable and returns a Subscribable or Promise,
* which, when it emits, signals that the associated buffer should be emitted
* and cleared.
* @return {Observable<T[]>} An observable of arrays of buffered values.
* @method bufferToggle
* @owner Observable
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update docs in line 22 to say "an Observable or Promise returned by...".
And update docs in line 40 too.

export function bufferToggle<T, O>(openings: Observable<O>,
closingSelector: (value: O) => Observable<any>): Observable<T[]> {
export function bufferToggle<T, O>(openings: SubscribableOrPromise<O>,
closingSelector: (value: O) => SubscribableOrPromise<any>): Observable<T[]> {
return this.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
}

export interface BufferToggleSignature<T> {
<O>(openings: Observable<O>, closingSelector: (value: O) => Observable<any>): Observable<T[]>;
<O>(openings: SubscribableOrPromise<O>, closingSelector: (value: O) => SubscribableOrPromise<any>): Observable<T[]>;
}

class BufferToggleOperator<T, O> implements Operator<T, T[]> {

constructor(private openings: Observable<O>,
private closingSelector: (value: O) => Observable<any>) {
constructor(private openings: SubscribableOrPromise<O>,
private closingSelector: (value: O) => SubscribableOrPromise<any>) {
}

call(subscriber: Subscriber<T[]>): Subscriber<T> {
Expand All @@ -73,25 +75,25 @@ interface BufferContext<T> {
* @ignore
* @extends {Ignored}
*/
class BufferToggleSubscriber<T, O> extends Subscriber<T> {
class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
private contexts: Array<BufferContext<T>> = [];

constructor(destination: Subscriber<T[]>,
private openings: Observable<O>,
private closingSelector: (value: O) => Observable<any>) {
private openings: SubscribableOrPromise<O>,
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
super(destination);
this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this)));
this.add(subscribeToResult(this, openings));
}

protected _next(value: T) {
protected _next(value: T): void {
const contexts = this.contexts;
const len = contexts.length;
for (let i = 0; i < len; i++) {
contexts[i].buffer.push(value);
}
}

protected _error(err: any) {
protected _error(err: any): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
Expand All @@ -103,7 +105,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
super._error(err);
}

protected _complete() {
protected _complete(): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
Expand All @@ -116,82 +118,57 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
super._complete();
}

openBuffer(value: O) {
const closingSelector = this.closingSelector;
const contexts = this.contexts;

let closingNotifier = tryCatch(closingSelector)(value);
if (closingNotifier === errorObject) {
this._error(errorObject.e);
} else {
let context = {
buffer: <T[]>[],
subscription: new Subscription()
};
contexts.push(context);
const subscriber = new BufferToggleClosingsSubscriber<T>(this, context);
const subscription = closingNotifier.subscribe(subscriber);
context.subscription.add(subscription);
this.add(subscription);
}
notifyNext(outerValue: any, innerValue: O,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, O>): void {
outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
}

closeBuffer(context: BufferContext<T>) {
const contexts = this.contexts;
if (contexts === null) {
return;
}
const { buffer, subscription } = context;
this.destination.next(buffer);
contexts.splice(contexts.indexOf(context), 1);
this.remove(subscription);
subscription.unsubscribe();
notifyComplete(innerSub: InnerSubscriber<T, O>): void {
this.closeBuffer((<any> innerSub).context);
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> {
constructor(private parent: BufferToggleSubscriber<T, O>) {
super(null);
private openBuffer(value: O): void {
try {
const closingSelector = this.closingSelector;
const closingNotifier = closingSelector.call(this, value);
if (closingNotifier) {
this.trySubscribe(closingNotifier);
}
} catch (err) {
this._error(err);
}
}

protected _next(value: O) {
this.parent.openBuffer(value);
}
private closeBuffer(context: BufferContext<T>): void {
const contexts = this.contexts;

protected _error(err: any) {
this.parent.error(err);
if (contexts && context) {
const { buffer, subscription } = context;
this.destination.next(buffer);
contexts.splice(contexts.indexOf(context), 1);
this.remove(subscription);
subscription.unsubscribe();
}
}

protected _complete() {
// noop
}
}
private trySubscribe(closingNotifier: any): void {
const contexts = this.contexts;

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class BufferToggleClosingsSubscriber<T> extends Subscriber<any> {
constructor(private parent: BufferToggleSubscriber<T, any>,
private context: { subscription: any, buffer: T[] }) {
super(null);
}
const buffer: Array<T> = [];
const subscription = new Subscription();
const context = { buffer, subscription };
contexts.push(context);

protected _next() {
this.parent.closeBuffer(this.context);
}
const innerSubscription = subscribeToResult(this, closingNotifier, <any>context);

protected _error(err: any) {
this.parent.error(err);
}
if (!innerSubscription || innerSubscription.isUnsubscribed) {
this.closeBuffer(context);
} else {
(<any> innerSubscription).context = context;

protected _complete() {
this.parent.closeBuffer(this.context);
this.add(innerSubscription);
subscription.add(innerSubscription);
}
}
}
}