Skip to content

Commit

Permalink
fix(bufferToggle): handle closingSelector completes immediately
Browse files Browse the repository at this point in the history
relates to ReactiveX#1487
  • Loading branch information
kwonoj committed Mar 19, 2016
1 parent 760f36b commit 959990d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 38 deletions.
12 changes: 12 additions & 0 deletions spec/operators/bufferToggle-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,4 +380,16 @@ describe('Observable.prototype.bufferToggle', () => {
done.fail();
});
});

it('should handle empty closing observable', () => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------|');
const subs = '^ !';
const e2 = cold('--x-----------y--------z---| ');
const expected = '--l-----------m--------n-----------|';

const result = e1.bufferToggle(e2, () => Observable.empty());

expectObservable(result).toBe(expected, {l: [], m: [], n: []});
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});
56 changes: 28 additions & 28 deletions spec/operators/publishReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ describe('Observable.prototype.publishReplay', () => {
expect(source instanceof Rx.ConnectableObservable).toBe(true);
});

it('should follow the RxJS 4 behavior and NOT allow you to reconnect by subscribing again', (done: DoneSignature) => {
const expected = [1, 2, 3, 4];
let i = 0;

const source = Observable.of(1, 2, 3, 4).publishReplay(1);

const results = [];

source.subscribe(
(x: number) => {
expect(x).toBe(expected[i++]);
},
done.fail,
() => {
i = 0;

source.subscribe((x: number) => {
results.push(x);
}, done.fail, done);

source.connect();
});

source.connect();

expect(results).toEqual([4]);
});

it('should do nothing if connect is not called, despite subscriptions', () => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = [];
Expand Down Expand Up @@ -356,32 +384,4 @@ describe('Observable.prototype.publishReplay', () => {

published.connect();
});

it('should follow the RxJS 4 behavior and NOT allow you to reconnect by subscribing again', (done: DoneSignature) => {
const expected = [1, 2, 3, 4];
let i = 0;

const source = Observable.of(1, 2, 3, 4).publishReplay(1);

const results = [];

source.subscribe(
(x: number) => {
expect(x).toBe(expected[i++]);
},
done.fail,
() => {
i = 0;

source.subscribe((x: number) => {
results.push(x);
}, done.fail, done);

source.connect();
});

source.connect();

expect(results).toEqual([4]);
});
});
25 changes: 15 additions & 10 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {

private closeBuffer(context: BufferContext<T>): void {
const contexts = this.contexts;
if (contexts === null) {
return;

if (contexts && context) {
const { buffer, subscription } = context;
this.destination.next(buffer);
contexts.splice(contexts.indexOf(context), 1);
this.remove(subscription);
subscription.unsubscribe();
}
const { buffer, subscription } = context;
this.destination.next(buffer);
contexts.splice(contexts.indexOf(context), 1);
this.remove(subscription);
subscription.unsubscribe();
}

private trySubscribe(closingNotifier: any): void {
Expand All @@ -134,10 +134,15 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
contexts.push(context);

const innerSubscription = subscribeToResult(this, closingNotifier, <any>context);
(<any> innerSubscription).context = context;

this.add(innerSubscription);
subscription.add(innerSubscription);
if (!innerSubscription.isUnsubscribed) {
(<any> innerSubscription).context = context;

this.add(innerSubscription);
subscription.add(innerSubscription);
} else {
this.closeBuffer(context);
}
}
}

Expand Down

0 comments on commit 959990d

Please sign in to comment.