Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replay subscribes multiple times to upstream sequence if that sequence completes #1155

Closed
5 of 17 tasks
hannesstruss opened this issue Mar 27, 2017 · 5 comments
Closed
5 of 17 tasks

Comments

@hannesstruss
Copy link

Short description of the issue:

Snippet 1:

let connectable: ConnectableObservable<Int> = Observable.deferred {
  print("Doing work!")
  return Observable.just(1)
}.replay(1)

connectable.connect()
connectable.connect()

Snippet 2:

let connectable: ConnectableObservable<Int> = Observable.deferred {
  print("Doing work!")
  return Observable.never().startWith(1)
}.replay(1)

connectable.connect()
connectable.connect()

Expected outcome:

Snippet 1 and Snippet 2 to print "Doing work!" exactly once.

What actually happens:

Snippet 1 prints "Doing work!" twice. Snippet 2 prints once, as expected. This seems to be a regression between 3.1.0 and 3.2.0. The issue persists in 3.3.1.

RxSwift/RxCocoa/RxBlocking/RxTest version/commit

3.2.0-3.3.1

Platform/Environment

  • iOS
  • macOS
  • tvOS
  • watchOS
  • playgrounds

How easy is to reproduce? (chances of successful reproduce after running the self contained code)

  • easy, 100% repro
  • sometimes, 10%-100%
  • hard, 2% - 10%
  • extremely hard, %0 - 2%

Xcode version:

Version 8.2.1 (8C1002)

⚠️ Fields below are optional for general issues or in case those questions aren't related to your issue, but filling them out will increase the chances of getting your issue resolved. ⚠️

Installation method:

  • CocoaPods
  • Carthage
  • Git submodules

I have multiple versions of Xcode installed:
(so we can know if this is a potential cause of your issue)

  • yes (which ones)
  • no

Level of RxSwift knowledge:
(this is so we can understand your level of knowledge
and formulate the response in an appropriate manner)

  • just starting
  • I have a small code base
  • I have a significant code base
@kzaher
Copy link
Member

kzaher commented Mar 27, 2017

Hi @hannesstruss ,

I think this is related with #1111.

We'll try to fix these issues.

I think that in the meantime, this should provide you a workaround.

let connectable: ConnectableObservable<Int> = Observable.deferred {
  print("Doing work!")
  return Observable.just(1)
}.subscribeOn(CurrentThreadScheduler.instance).replay(1)

@hannesstruss
Copy link
Author

@kzaher The workaround doesn't seem to do the trick for me – I'll track #1111 though. Thanks!

@RafaelPlantard
Copy link

Same to me @hannesstruss

@kzaher
Copy link
Member

kzaher commented May 28, 2017

Hi guys,

I've done some further investigations of this. It looks like RxJS and RxJava actually behave like we do now, after 3.2.0.

RxJava

ConnectableObservable<String> conn = Observable.defer(new Callable<ObservableSource<String>>() {
                             @Override
                             public ObservableSource<String> call() throws Exception {
                                System.out.println("Doing work");
                                return Observable.just("result");
                             }
                         }).replay(1);

        conn.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("next" + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("error " + throwable.toString());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("completed");
            }
        });

        conn.connect();
        conn.connect();

RxJS

var xs = Rx.Observable.defer(x => {
  console.log("Doing work! " + new Date().getTime())
  return Rx.Observable.of(new Date().getTime())
}).multicast(Rx.ReplaySubject.create(1))
xs.connect()
xs.connect()

I think there was actually a bug before 3.2.0 for not clearing the connection once sequence terminates.

I've managed to solve other sharing problems, but the more I look at this, it seems to me that current behavior is actually correct.

In light of all of this, I would like to close this issue since changing this behavior would make us inconsistent with other Rx implementation and make our life harder since some of the new work actually relies on this behavior.

@hannesstruss
Copy link
Author

Alright, thanks for looking into it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants