Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Simpler way to convert a stream to Behavior Subjects or some other alternative? #1085

Closed
tusharmath opened this issue Dec 28, 2015 · 13 comments

Comments

@tusharmath
Copy link

const subject = new rx.BehaviorSubject(100)
const even = subject.filter(x => x % 2 === 0)
const odd = subject.filter(x => x % 2 === 1)

subject.onNext(101)
subject.onNext(102)
subject.onNext(103)
subject.onNext(104)
subject.onNext(105)

even.subscribe(x => console.log('even', x))
odd.subscribe(x => console.log('odd', x))

/**output 
odd 105
**/

In the above case if the subscription is made later in time then only the last emitted value is dispatched.

What I would like is something like this —

/** output 
even 104
odd 105
**/

To do this the only way I have found is to create two behavior subjects for each one of the streams and and subscribe to those —

const subject = new rx.BehaviorSubject(100)

const subjectEven = new rx.BehaviorSubject()
const subjectOdd = new rx.BehaviorSubject()

const even = subject.filter(x => x % 2 === 0).subscribe(subjectEven)
const odd = subject.filter(x => x % 2 === 1).subscribe(subjectOdd)

subject.onNext(101)
subject.onNext(102)
subject.onNext(103)
subject.onNext(104)
subject.onNext(105)

subjectEven.subscribe(x => console.log('even', x))
subjectOdd.subscribe(x => console.log('odd', x))

Is there a way I can simplify this code and not have to create BehaviorSubjects for each of the the stream?

[closing #1083 #1084 in lieu if this issue]

@mattpodwysocki
Copy link
Member

@tusharmath why not use a ReplaySubject(2) which would keep two values in the cache until its needed?

@tusharmath
Copy link
Author

That would work but only for this use case. Say I add another filter —

const divisibleBy177 = subject.filter(x => x % 777 === 0)

Now what would be the buffer size?

What I am really wanting to do is something like what combineLatest is already doing, except that it only starts doing it once all the streams fire atleast once "post" subscription.

Can I implement a version of combineLatest (called replayLatest, for the lack of a better name), that works like this —

Observable.replayLatest(even, odd).subscribe(x => console.log(x))
/** output 
even 104
odd 105
**/

Thought's comments ?

@brucou
Copy link

brucou commented Jan 8, 2016

Would it work to put your replay at the end of each stream, i.e. something like even.replay(null, 1)? I wonder if withLatestFrom could also be used here. Or am I getting it wrong?

@tusharmath
Copy link
Author

Can you paste the exact code which I should be using?

@brucou
Copy link

brucou commented Jan 8, 2016

const subject = new rx.BehaviorSubject(100)
const even = subject.filter(x => x % 2 === 0).replay(null,1)
const odd = subject.filter(x => x % 2 === 1).replay(null,1)

I am working in RxJS 4. If you use the newer version, I don't know if the replay operator is still there.

@tusharmath
Copy link
Author

it doesn't output anything.

@brucou
Copy link

brucou commented Jan 8, 2016

Gosh that's unsurprising. I misunderstood your question. You are subscribing after all the data have been passed. Cf. http://stackoverflow.com/questions/32190445/hot-and-cold-observables-are-there-hot-and-cold-operators/34669444#34669444
The behaviour subject only keeps 1 value. If you really want to see all the data who were passed after the fact, and you don't know how many there are, then you have to use a replaySubject, as Matt was accurately suggesting. Cf. http://jsfiddle.net/b2dswaLd/1/
Otherwise what you did with the extra two behavior subjects is correct and in my opinion a better option than using a replaySubject which will keep the whole sequence in memory.
Side remark though, your issue is not really an issue (bug etc.) with RxJS implementation, I guess it belongs more to a Q&A forum like stackoverflow.

@tusharmath
Copy link
Author

@brucou Loved the links you just gave. I learnt quite a few things.
How about implementing something like — replayLatest()

rx.Observable.prototype.replayLatest = function () {
  const subject = new rx.BehaviorSubject()
  this.subscribe(subject)
  return subject
}

And then use it —

const subject = new rx.BehaviorSubject(100)
const even = subject.filter(x => x % 2 === 0).replayLatest()
const odd = subject.filter(x => x % 2 === 1).replayLatest()

subject.onNext(101)
subject.onNext(102)
subject.onNext(103)
subject.onNext(104)
subject.onNext(105)

even.subscribe(x => console.log('even', x))
odd.subscribe(x => console.log('odd', x))

/**
OUTPUT
even 104
odd 105
**/

@brucou
Copy link

brucou commented Jan 9, 2016

For your own needs, you can do whatever you please. If you publish a new Rx operator, it should however respect Rxjs contract and guidelines. Guidelines here : https://github.com/Reactive-Extensions/RxJS/tree/master/doc/designguidelines. Personally I would not find this operator to be a great addition to the library but who am I to judge. But for your personal needs, by all means go ahead.

@tusharmath
Copy link
Author

Thank you so much @brucou !

bouzuya pushed a commit to bouzuya/RxJS that referenced this issue Mar 23, 2016
Add test for groupBy() operator, to verify that it supports the composable lift architecture.

Test for issue Reactive-Extensions#1085.
bouzuya pushed a commit to bouzuya/RxJS that referenced this issue Mar 23, 2016
Fix bug Reactive-Extensions#1085 in which groupBy was not using lift(). Using lift() is critical to support the
ubiquitous lift-based architecture in RxJS Next

Resolves Reactive-Extensions#1085.
@jerradpatch
Copy link

jerradpatch commented Jul 1, 2017

I was actually looking for something like this. It works well in the case where .share() does not, ie: all subscribers dont subscribe before the source starts emitting yet want the latest value. Although, it could be improved by adding a subscriber count and unsubscribing when the count = 0;

@brucou
Copy link

brucou commented Oct 5, 2017

Long time passed, let me see what that was about again.

alright, main reason I guess I was thinking it is not such a good addition to the library is that there are already plenty of operators out there, and this one seems like it could be obtained easily from the existing ones, I don't think it breaks any concept in reactive programming or anything. It looks very close to publishBehavior (that is Rxjs v4, don't know what the guys did in the v5, but I am sure in the new version there is a variation of the multicast operator that allows you to replicate this behavior.). mmm actually I was looking for publishBehavior but it seems it was called publishValue??, cf. https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/publishvalue.js.
well it actually seems there is a [publishBehavior] in v5 (https://github.com/ReactiveX/rxjs/blob/master/src/operator/publishBehavior.ts)

Also, with the multicast operator, together with the subject factory parameter, you can further parametrize the behaviour that you seeks after completion. cf,. ReactiveX/rxjs#1363

@aspergillusOryzae
Copy link

aspergillusOryzae commented Feb 1, 2018

I stumbled on this while trying to get something similar in Angular 4 and thought it may help someone. I cannot yet get the code to work in Angular, not sure if it's because I'm using a BehaviorSubject instead of a straight Observable, but still working through that issue. In the meantime this fiddle does work: https://jsfiddle.net/aspergillusOryzae/swktjsps/

const source = Rx.Observable.from([
	{ name: 'Joe',  age: 31 }, 
        { name: 'Bob',  age: 25 }, 
        { name: 'Alice', age: 36 },
        { name: 'Cecil', age: 18 },
	{ name: 'Dave', age: 45 },
  ]);

// function to get subscription based on input age
function getSubOverAge(modVal) {
  return source.filter(num => num.age >= modVal);
}

//output
const subscribe = getSubOverAge(20).subscribe(val => console.log(`Over 20: ${val.name}`));
const subscribe2 = getSubOverAge(30).subscribe(val => console.log(`Over 30: ${val.name}`));
const subscribe3 = getSubOverAge(40).subscribe(val => console.log(`Over 40: ${val.name}`));

/**
OUTPUT
Over 20: Joe
Over 20: Bob
Over 20: Alice
Over 20: Dave
Over 30: Joe
Over 30: Alice
Over 30: Dave
Over 40: Dave
**/

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants