Skip to content

Commit

Permalink
fix(publish,publishReplay): resolve sharing Subject
Browse files Browse the repository at this point in the history
change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue ReactiveX#5411
  • Loading branch information
e-davidson authored and cartant committed May 18, 2021
1 parent 190e4d0 commit c9112fd
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 7 deletions.
26 changes: 25 additions & 1 deletion spec/operators/publish-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators';
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';

/** @test {publish} */
describe('publish operator', () => {
Expand Down Expand Up @@ -337,4 +337,28 @@ describe('publish operator', () => {
expect(subscriptions).to.equal(1);
done();
});

it('should subscribe to its own source when using a shared pipeline', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';

const sharedPipeLine = pipe(
publish()
);

const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable<any>;
const expected1 = '-1-2-3-4-5-|';
const expected2 = '-6-7-8-9-0-|';

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
26 changes: 25 additions & 1 deletion spec/operators/publishReplay-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs';
import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators';

/** @test {publishReplay} */
Expand Down Expand Up @@ -487,4 +487,28 @@ describe('publishReplay operator', () => {
expectObservable(published).toBe(expected, undefined, error);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should subscribe to its own source when using a shared pipeline', () => {
const source1 = cold('-1-2-3-4-5-|');
const source2 = cold('-6-7-8-9-0-|');
const expected1 = '-1-2-3-4-5-|';
const expected2 = '-6-7-8-9-0-|';
const source1Subs = '^ !';
const source2Subs = '^ !';

const sharedPipeLine = pipe(
publishReplay(1)
);

const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
2 changes: 1 addition & 1 deletion src/internal/operators/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ export function publish<T, O extends ObservableInput<any>>(selector: (shared: Ob
* Details: https://rxjs.dev/deprecations/multicasting
*/
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
return selector ? connect(selector) : multicast(new Subject<T>());
return selector ? (source) => connect(selector)(source) : (source) => multicast(new Subject<T>())(source);
}
5 changes: 1 addition & 4 deletions src/internal/operators/publishReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,8 @@ export function publishReplay<T, R>(
if (selectorOrScheduler && !isFunction(selectorOrScheduler)) {
timestampProvider = selectorOrScheduler;
}

const selector = isFunction(selectorOrScheduler) ? selectorOrScheduler : undefined;
const subject = new ReplaySubject<T>(bufferSize, windowTime, timestampProvider);

// Note, we're passing `selector!` here, because at runtime, `undefined` is an acceptable argument
// but it makes our TypeScript signature for `multicast` unhappy (as it should, because it's gross).
return (source: Observable<T>) => multicast(subject, selector!)(source);
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, timestampProvider), selector!)(source);
}

0 comments on commit c9112fd

Please sign in to comment.