diff --git a/spec/operators/bufferToggle-spec.ts b/spec/operators/bufferToggle-spec.ts index 1c6952f75a..96b539a1e1 100644 --- a/spec/operators/bufferToggle-spec.ts +++ b/spec/operators/bufferToggle-spec.ts @@ -343,7 +343,7 @@ describe('Observable.prototype.bufferToggle', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); - it('should accept closing selector returns promise resolves', (done: DoneSignature) => { + it('should accept closing selector that returns a resolved promise', (done: DoneSignature) => { const e1 = Observable.concat(Observable.of(1), Observable.timer(10).mapTo(2), Observable.timer(10).mapTo(3), @@ -361,7 +361,7 @@ describe('Observable.prototype.bufferToggle', () => { }); }); - it('should accept closing selector returns promise rejects', (done: DoneSignature) => { + it('should accept closing selector that returns a rejected promise', (done: DoneSignature) => { const e1 = Observable.concat(Observable.of(1), Observable.timer(10).mapTo(2), Observable.timer(10).mapTo(3), diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index 686043e3b5..a21c73f179 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -23,6 +23,34 @@ describe('Observable.prototype.publishReplay', () => { expect(source instanceof Rx.ConnectableObservable).toBe(true); }); + it('should follow the RxJS 4 behavior and NOT allow you to reconnect by subscribing again', (done: DoneSignature) => { + const expected = [1, 2, 3, 4]; + let i = 0; + + const source = Observable.of(1, 2, 3, 4).publishReplay(1); + + const results = []; + + source.subscribe( + (x: number) => { + expect(x).toBe(expected[i++]); + }, + done.fail, + () => { + i = 0; + + source.subscribe((x: number) => { + results.push(x); + }, done.fail, done); + + source.connect(); + }); + + source.connect(); + + expect(results).toEqual([4]); + }); + it('should do nothing if connect is not called, despite subscriptions', () => { const source = cold('--1-2---3-4--5-|'); const sourceSubs = []; @@ -356,32 +384,4 @@ describe('Observable.prototype.publishReplay', () => { published.connect(); }); - - it('should follow the RxJS 4 behavior and NOT allow you to reconnect by subscribing again', (done: DoneSignature) => { - const expected = [1, 2, 3, 4]; - let i = 0; - - const source = Observable.of(1, 2, 3, 4).publishReplay(1); - - const results = []; - - source.subscribe( - (x: number) => { - expect(x).toBe(expected[i++]); - }, - done.fail, - () => { - i = 0; - - source.subscribe((x: number) => { - results.push(x); - }, done.fail, done); - - source.connect(); - }); - - source.connect(); - - expect(results).toEqual([4]); - }); }); \ No newline at end of file diff --git a/src/operator/bufferToggle.ts b/src/operator/bufferToggle.ts index bbc23d3407..7ad7524c60 100644 --- a/src/operator/bufferToggle.ts +++ b/src/operator/bufferToggle.ts @@ -37,10 +37,10 @@ import {InnerSubscriber} from '../InnerSubscriber'; * * @param {Observable} openings An observable of notifications to start new * buffers. - * @param {function(value: O): Observable} closingSelector A function that takes - * the value emitted by the `openings` observable and returns an Observable, - * which, when it emits, signals that the associated buffer should be emitted - * and cleared. + * @param {function(value: O): SubscribableOrPromise} closingSelector A function + * that takes the value emitted by the `openings` observable and returns + * a Subscribable or Promise, which, when it emits, signals that the associated + * buffer should be emitted and cleared. * @return {Observable} An observable of arrays of buffered values. * @method bufferToggle * @owner Observable @@ -157,13 +157,13 @@ class BufferToggleSubscriber extends OuterSubscriber { const innerSubscription = subscribeToResult(this, closingNotifier, context); - if (!innerSubscription.isUnsubscribed) { + if (innerSubscription.isUnsubscribed) { + this.closeBuffer(context); + } else { ( innerSubscription).context = context; this.add(innerSubscription); subscription.add(innerSubscription); - } else { - this.closeBuffer(context); } } }