Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reusing publish or publishReplay in a pipe can subscribe to the wrong data source #5411

Closed
e-davidson opened this issue Apr 28, 2020 · 7 comments
Labels
bug Confirmed bug

Comments

@e-davidson
Copy link
Contributor

Bug Report

Current Behavior

When you reuse a pipe with publish or publishReplay all uses of the pipe share the same instance of ConnectableObservable.
Which causes the connection to connect when the other one connected and gets data from the other source.

const pipeLine = pipe(
  map((x: number) => x * x),
  publish(),
);

const obs1 = src1.pipe(pipeLine);
const obs2 = src2.pipe(pipeLine);

obs1.subscribe(d => console.log('sub 1', d));
obs2.subscribe(d => console.log('I should not be running', d));

obs1.connect();

This is what you get on the console

sub 1 1
I should not be running 1
sub 1 4
I should not be running 4
sub 1 9
I should not be running 9
sub 1 16
I should not be running 16
sub 1 25
I should not be running 25

obs2 which was never connected has been connected and it's also connected to the wrong source.

Reproduction

https://stackblitz.com/edit/rxjs-nrcyfu

Expected behavior

reusing pipes should not effect which source connects to which subscription.

Only the 1st subscription should run as the 2nd one hasn't been connected yet.
Also the 2nd subscription should not be subscribed to the first observable.

Workaround

Use a factory method instead.

    const pipeLine = () =>  pipe(
      map((x: number) => x * x),
      publish(),
    );

Possible Solution

for publishReplay
https://github.com/ReactiveX/rxjs/blob/259e5cd2e8149bc5db14cc0216afec7131e8aa11/src/internal/operators/publishReplay.ts

Instead of creating the replay subject on line 22 instead it can be created in the call back on line 24. That way each source observable would have it's own replay subject.

for publish

https://github.com/ReactiveX/rxjs/blob/259e5cd2e8149bc5db14cc0216afec7131e8aa11/src/internal/operators/publish.ts

On line 61 we can return a factory that takes the source instead of calling multicast directly.

@cartant cartant added the bug Confirmed bug label Apr 28, 2020
@cartant
Copy link
Collaborator

cartant commented Apr 28, 2020

Yeah, this looks like a bug to me. Thanks.

e-davidson added a commit to e-davidson/rxjs that referenced this issue Jul 10, 2020
change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue ReactiveX#5411
@kawazoe
Copy link

kawazoe commented Jul 17, 2020

I have just stumbled on this issue as well in a project I am working on.

I am very confused by this since it appears to be pretty major to me. Some very popular blog posts explaining how those operators work also describe publishReplay(x) as an equivalent to multicast(() => new ReplaySubject(x)). The proposed solution for publishReplay would make this true.

Since you mentioned that the current behavior indeed looks like a bug, I feel like the article should be true... Except that I cannot find any trace of this operator ever working this way before. Unfortunately, the history doesn't go past 2018 so I am not convinced about this.

Would you mind sharing your view on this @cartant ? Is this really the correct behavior? Is using multicast(() => new ReplaySubject()) an appropriate workaround until a fix comes to light?

@cartant
Copy link
Collaborator

cartant commented Jul 18, 2020

@kawazoe

Would you mind sharing your view on this cartant ? Is this really the correct behavior?

There is no fix needed for publishReplay's behaviour. It does not restart by design. That is the essential difference between the publish and share family of operators. For reference, see the discussion in #453. It's long, so start at this comment and work backwards.

@kawazoe
Copy link

kawazoe commented Jul 18, 2020

@cartant I've just got through the entire thread and wow... what a can of worm this is... I definitely did not expected this issue to be as complex when I asked that initial question. It also explain why I felt like their behavior changed at some point.

Thanks a LOT of that pointer. I don't know why I couldn't find that issue initially.

I agree with the final decision, but I think the subtleties of those operators could be better documented. There is a reason why there are many people trying to explain them in blogs. They are not trivial to understand and have multiple edge cases like hot/cold sources, completed or ongoing sources, completed or ongoing subscribers, etc...

@cartant
Copy link
Collaborator

cartant commented Jul 18, 2020

... I think the subtitles of those operators could be better documented.

Yeah. It is confusing. The publish variants should also be static and refCount should not be an operator - see #5585 (comment)

@e-davidson
Copy link
Contributor Author

I just want to clarify. This bug has nothing to do with restarting.
This issue is when creating a pipe without a source observable and using pipe to connect an existing observable-less pipe to an observable.

As presented here https://stackblitz.com/edit/rxjs-nrcyfu

    const pipeLine = () =>  pipe(
      map((x: number) => x * x),
      publishReplay(),
    );

The pipeLine should've been able to be reused for multiple source observables, but it is not.

The proposed solution #5411 does not change the restart-ability of it. Once it's bound to an observable it won't be able to restart. What it's accomplishing is moving the creation of the subject into the factory to delay it's creation to when it's bound to the observable as apposed to when the pipe function is called. This solved the ability to share the pipe but does not actually change the behavior once it's bound to an observable.

cartant pushed a commit to cartant/rxjs that referenced this issue May 14, 2021
change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue ReactiveX#5411
cartant pushed a commit to cartant/rxjs that referenced this issue May 18, 2021
change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue ReactiveX#5411
cartant pushed a commit to cartant/rxjs that referenced this issue May 18, 2021
change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue ReactiveX#5411
cartant pushed a commit to cartant/rxjs that referenced this issue May 20, 2021
change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue ReactiveX#5411
benlesh pushed a commit that referenced this issue May 21, 2021
…ublish`, `publishReplay` are now referentially transparent. Meaning if you take the result of calling `publish()` and pass it to N observable `pipe` methods, it will behave the same in each case, rather than having a cumulative effect, which was a regression introduced sometime in 6.x. If you required this broken behavior, there is a work around posted [here](#6410 (comment)) (#6410)

* fix(publish,publishReplay): resolve sharing Subject

change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue #5411

* test: rearrange tests

* test: add failing ref transparency tests

* fix(publishBehavior): make ref transparent

* fix(publishLast): make ref transparent

* test: add failing ref transparency tests

* fix(share): make ref transparent

* chore: add a comment

* test: change descriptions and add comments

* refactor: destructure options outside of op func

* chore: use consistent terminology in comments

* test: use consistent terminology

Co-authored-by: Eli Davidson <edavidson@broadfinancial.com>
@e-davidson
Copy link
Contributor Author

This issue has been resolved in version 7.1.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Confirmed bug
Projects
None yet
Development

No branches or pull requests

3 participants