Skip to content

Commit

Permalink
fix(shareReplay): properly retains history on subscribe (#2910)
Browse files Browse the repository at this point in the history
When the refCount hits zero, and the source has neither completed nor errored, `shareReplay` will now properly stay subscribed to the source, and retain the internal ReplaySubject that is caching values

fixes #2908
  • Loading branch information
benlesh authored Oct 6, 2017
1 parent 124e231 commit accbcd0
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 12 deletions.
15 changes: 15 additions & 0 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;
declare const time: typeof marbleTestingSignature.time;
declare const rxTestScheduler: typeof marbleTestingSignature.rxTestScheduler;

const Observable = Rx.Observable;

Expand Down Expand Up @@ -164,4 +166,17 @@ describe('Observable.prototype.shareReplay', () => {
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should not restart if refCount hits 0 due to unsubscriptions', () => {
const results = [];
const source = Rx.Observable.interval(10, rxTestScheduler)
.take(10)
.shareReplay(1);
const subs = source.subscribe(x => results.push(x));
rxTestScheduler.schedule(() => subs.unsubscribe(), 35);
rxTestScheduler.schedule(() => source.subscribe(x => results.push(x)), 54);

rxTestScheduler.flush();
expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]);
});
});
44 changes: 32 additions & 12 deletions src/operators/shareReplay.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,45 @@
import { Observable } from '../Observable';
import { multicast } from './multicast';
import { refCount } from './refCount';
import { ReplaySubject } from '../ReplaySubject';
import { ConnectableObservable } from '../observable/ConnectableObservable';
import { IScheduler } from '../Scheduler';

import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction } from '../interfaces';

/**
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler ): MonoTypeOperatorFunction<T> {
let subject: ReplaySubject<T>;
let refCount = 0;
let subscription: Subscription;
let hasError = false;
let isComplete = true;

const connectable = multicast(function shareReplaySubjectFactory(this: ConnectableObservable<T>) {
if (this._isComplete) {
return subject;
} else {
return (subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler));
return (source: Observable<T>) => new Observable<T>(observer => {
refCount++;
if (!subject || hasError) {
hasError = false;
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
subscription = source.subscribe({
next(value) { subject.next(value); },
error(err) {
hasError = true;
subject.error(err);
},
complete() {
isComplete = true;
subject.complete();
},
});
}

const innerSub = subject.subscribe(observer);

return () => {
refCount--;
innerSub.unsubscribe();
if (subscription && refCount === 0 && !isComplete) {
subscription.unsubscribe();
}
};
});
return ((source: Observable<T>) => refCount()(connectable(source))) as MonoTypeOperatorFunction<T>;
};
};

0 comments on commit accbcd0

Please sign in to comment.