-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
publishBehavior and publishReplay semantics when completed #453
Comments
In RxJava this behaves as so: ConnectableObservable<Integer> conn = just(1).doOnSubscribe(() -> System.out.println("subscribed")).publish();
conn.subscribe(System.out::println);
conn.subscribe(System.out::println);
conn.connect();
conn.subscribe(System.out::println);
conn.subscribe(System.out::println);
conn.connect(); outputs:
however in RxJS 4 the same thing: let s = Observable.create(observer => {
console.log('subscribed');
observer.onNext(1);
observer.onCompleted();
})
.publish();
s.subscribe(::console.log);
s.subscribe(::console.log);
s.connect();
s.subscribe(::console.log);
s.subscribe(::console.log);
s.connect(); outputs only:
RxJS 5(Next) was designed to match the semantics of RxJava, and also to remove the potential footgun from people passing the same subject into two |
Isn't |
The the problem RxJS 5 is trying to solve here is that ConnectableObservables should be "reconnectable", and |
Why is the lack of reconnectability a problem? Also, we're talking about publishFoo so far, not yet about |
I recall Erik Meijer recommending to "make the correct patterns easy and make the wrong patterns possible", so it's easy to detect bad patterns in code reviews. Passing directly a subject to multiple multicasts is easy to catch in code reviews. |
If you want to model anything you should be able to reconnect to, say a WebSocket, it's a problem.
The reason I brought up refCount is it's the behavior of RxJS 2/3/4 regarding ConnectableObservables that makes this impossible. FWIW, I don't think any other flavor of Rx behaves like RxJS in this manner. I'm not sure what you want is to make ConnectableObservables broken after the the underlying subject has unsubscribed. I think what you want is a specific behavior around |
I guess you meant "subject has completed". That's what I mean at least. I think the ConnectableObservable could still be connected. Maybe I should explain what should happen in marble diagrams:
Capital
Can't this approach below model that? var reconnectionMoments = // some infinite Observable
reconnectionMoments
.switchMap(moment => Rx.DOM.fromWebSocket('ws://echo.websockets.org', null))
.subscribe(/* ... */) |
I had a typo/mistake in the marble diagram. Second subscriber should see |
Sure that's the behavior of the
That actually doesn't help, because It's better to be able to say: |
Question: Why is it important to you that ConnectableObservables never allow reconnection? What use case does that serve? |
I'm not sure how anything passed in those tests above, though. expect(results1).toBe([0,1,2,3,4]); would never assert |
... it seems like your issue is specifically with ReplaySubject. Which is the only subject that will actually emit values after it's been disposed/completed/errored. It seems like the ReplaySubject semantics in RxJS 4/3/2 are actually broken. A Subject that has errored should still emit values. A subject that has completed shouldn't still emit values: In RxJS 4: var sub = new Rx.ReplaySubject(2);
sub.subscribe(::console.log, ::console.error);
Rx.Observable.range(0, 3).concat(Rx.Observable.throw('bad')).subscribe(sub);
sub.subscribe(::console.log, ::console.error); outputs:
That doesn't seem right, a Subject that's in error can still emit values? |
Reconnectability is not my gripe. It's about what happens in the space between the subject's complete and the next connect. In particular, we are discussing what should happen to
I'm saying subscriber2 should see
Well then this was an unfortunate example to talk about, because a ConnectableObservable is not an Observer.
This behavior can be built with the switchMap and a publish() (with a one-time connect()), by modifying
Because (refer to the marble diagram above) it serves the use case of giving me a guarantee of what subscribers see after the subject has completed will follow the same conventions as subscribing to an actual subject. These guarantees might be particularly important in Replay case, for instance. I have a cold Observable, I want multiple subscribers to see the same events simultaneously, and after the cold Observable completes, I want it to replay its values to late subscribers. Because that's what ReplaySubjects do. And in case it's a publishBehavior, I do not want it to emit any value after the cold Observable has completed, because that's the BehaviorSubject contract. I do not want to get the
That was wrong indeed. Those tests weren't actually jasmine tests. I just wrote pseudocode. But I'm sure that with console.log
It is correct for two reasons: from the perspective of an Observer, the Observable contract is preserved. Second reason because it's a ReplaySubject and that's what they do: they replay the values and the error. Your example above of a reconnectable Observable would also "violate" this contract because let s = Observable.create(observer => {
console.log('subscribed');
observer.onNext(1);
observer.onCompleted();
})
.publish();
s.subscribe(::console.log);
s.subscribe(::console.log);
s.connect();
s.subscribe(::console.log);
s.subscribe(::console.log);
s.connect(); should emit
And after the first instance, the observable completed, so "why can an Observable which completed still emit values?" Isn't that a Replay feature? ;) |
Ah... a side effect of the implementation, I was thinking that this used @trxcllnt's |
In speaking with some RxJava folks, it might be that we just need to implement |
That would be a very bold move, would be a big shift in Rx's foundation with regard to cold vs hot. |
Nah... I'm not talking "all observables" ... just the ones returned from |
Wow, this is a long thread ;-) But I think that I am with @staltz on this
Until the next |
@headinthebox, I don't disagree with that at all. The thing that @staltz didn't like is if the connectable observable completes, at next connect it recycles the underlying subject. This is how it works in RxJava as well, essentially. RxJS 4/3/2 would instead try to use the same underlying subject, meaning the connectable observable was "single use only". @staltz had come to depend on this behavior with regards to replay subjects, I think. In all discussions with @benjchristensen, @jhusian and many others, it was decided this behavior was "broken". |
This is unexpected new behavior. Could anybody clarify how I can prevent that replay values are dropped once complete? I need to be able to replay what has happened in past. Usually, this would have been a simple |
@Blesh I started the process of migrating Cycle.js to RxJS Next with the help of the community, and we hit our first big problem with this new behavior of connectable Observables. For my purposes, ConnectableObservable is "broken" here. Please consider that a lot of existing codebases out there might depend on how ConnectableObservable works. Our case goes like this:
To me, this is a really really big issue, and honestly, I'd consider not using RxJS Next at all in the Cycle.js community. Can you please consider alternatives? For instance:
Actually it's not recycling at the next connect(), it's recycling on complete. This happens in order in time: (1) first connect(), (2) source completes, (3) second connect(). Recycling is happening on moment (2), not on moment (3). I'd be perfectly ok with recycling on (3). But with it on (2), it goes against BehaviorSubject and ReplaySubject semantics after completed (therefore unintuitive, we're opening possibilities to confuse developers), and also blocks me from accomplishing what I need to accomplish. The other thing I'd propose is to look for different ways of achieving "reconnection". In my experience I have never had to do multiple calls to connect(). Doing complicated Why can't this approach work for reconnecting? I suggested it above but you didn't comment on it.
Please. |
@Blesh I'm also with @staltz and @headinthebox on this one. Changing the semantics now would be really confusing and not bring much benefit. |
For example this test fails (it timeouts): it('publishReplay(1) should replay values just after connect() is called', function (done) {
var obs = Observable.of('foo').publishReplay(1);
obs.connect();
obs.subscribe(
function (x) { expect(x).toBe('foo'); },
done.fail,
done
);
}); |
Is there any particular reason why this issue hasn’t been labeled discussion? Is it not up for discussion? Have the final words been said and the decision is final? It seems that those, e.g., @benjchristensen, @jhusain et al, who initially made the decision of this new alignment with RxJava are not even participating here. |
Sorry... I didn't label it. :\ heh
FWIW: I'm also one of those people that made the decision, from discussion with several others really early on in the development of this library who include @headinthebox, @abersnaze, @stealthcoder and @mattpodwysocki... There were changes made around multicast to help prevent certain antipatterns (like reusing the same Subject in two multicast calls) as well as facilitate hot observables being resubscribable. The old behavior was quirky and not friendly to new developers in particular. However, the side-effect of this change is that shareReplay and shareBehavior now act differently when the refCount returns to zero. |
@Blesh Thanks for the info. |
Seems like this issue shouldn't cause a fork. We can leave the existing multicast operator backward-compatible. In my experience the most common indirect use of multi cast is in the following scenarios:
As long as there are two operators that make these scenarios easy, I don't really care what the multicast operator does. I propose we change share() and avoid using the multicast operator to allow it to be retried. Given it is so commonly used for network requests, this seems like the right decision. The shareReplay operator doesn't need retry because it's results are cached. It would seem to satisfy the second scenario. |
To be clear: the new behavior isn't correct because @Blesh says so, it's correct because it's referentially transparent. Connecting a ConnectableObservable should behave the same way every time, regardless of whether you've connected it before or not. We also have the option to change |
This is a good middle-ground and the type of thing we've done in RxJava. |
@trxcllnt no, that's not the way it was designed nor intended to be designed, unless you happened to be in the original design meetings. It's YOUR opinion as to whether it was right or wrong. |
@mattpodwysocki that's exactly my point: it's not "correct" because someone decided so. It's correct because it's functionally pure, and the behavior is consistent with the rest of the library. The Observable referential transparency is beautiful, allowing them to be retried, etc. In the original design, ConnectableObservables aren't referentially transparent; it behaves differently whether you connect it the first time vs. the second. My argument is the same here as it is in other discussions re: hot-by-default, primarily: if we start from the pure implementation, we can graduate to the impure as-needed. If we start impure, we can never go back to pure. I'm arguing for this specifically because I've been bitten by repeat/retry on ConnectableObservable failing. |
OMG drama. Stop. This issue is open for discussion and compromise. Okay, so here's a proposed compromise:
|
@benlesh what if |
I'd prefer that |
I'm no expert of the inner workings of Rx, but in my attempts to update a library from Rx 4.0.6 to Rx 5.0.0-alpha.6 I was unable to recreate the behavior that I came to expect regarding |
Correct, let's do that and sorry about the community fork comment. |
It's all good... and to show it's all good, I made this emoji kiss poop: 😘💩 The agreed upon changes are merged and happy. We still need more tests around these operators though. |
Much better answer, thank you! |
Like! |
Thanks! |
👍 |
Great! Now we can write more articles, like this one: https://medium.com/@fkrautwald/plug-and-play-all-your-observable-streams-with-cycle-js-e543fc287872 |
`shareReplay` returns an observable that is the source multicasted over a `ReplaySubject`. That replay subject is recycled on error from the `source`, but not on completion of the source. This makes `shareReplay` ideal for handling things like caching AJAX results, as it's retryable. It's repeat behavior, however, differs from `share` in that it will not repeat the `source` observable, rather it will repeat the `source` observable's values. related #2013, #453, #2043
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
Continuing the discussion that started here: https://github.com/ReactiveX/RxJS/pull/235/files
RxJS Legacy (v4) passes these tests:
and
Yet RxJS Next behaves basically the opposite way:
Because in RxJS legacy there is
While in RxJS Next:
In my opinion, the new behavior in RxJS Next is illogical at first sight, and a potential source for a lot of developer confusion. Besides that, it differs from RxJS legacy features, making migration a potential headache. I think the semantics argument (with regard to subscriptions after complete(), publishReplay should behave just like a ReplaySubject and publishBehavior should behave just like a BehaviorSubject) is straightforward and also aligned with traditional Rx, such as in Rx.NET: check Lee Campbells book on this part.
The text was updated successfully, but these errors were encountered: