Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

combineWithMostRecent #405

Closed
samuelgruetter opened this issue Sep 23, 2013 · 41 comments
Closed

combineWithMostRecent #405

samuelgruetter opened this issue Sep 23, 2013 · 41 comments
Milestone

Comments

@samuelgruetter
Copy link
Contributor

I'm looking for an operation which does the following:

Whenever Observable o1 emits an item, combine this item with the most recent item of Observable o2.

Illustration:

----A-------B------C----->  o1

--0----1-2----3-4-------->  o2

    |       |      |
    V       V      V

  (A,0)   (B,2)  (C,4)

I can't find a nice way of doing this.
Can anyone help me?
Or do we need to add a new operation to rx.Observable?

@jmhofer
Copy link
Contributor

jmhofer commented Sep 23, 2013

Actually, combineLatest together with distinctUntilChanged, with equality based on the tuple projection would work here I think. Maybe there's an easier way, but this is what first came to my mind...

@samuelgruetter
Copy link
Contributor Author

But what if o1 emits two equal items in sequence?

@abersnaze
Copy link
Contributor

If that is part of your problem we're discussing the distinct operator here. #395 (comment)

@jmhofer
Copy link
Contributor

jmhofer commented Sep 24, 2013

Maybe this is a Join use case? I found a description here.

@headinthebox
Copy link
Contributor

Join is hardly ever used, and the semantics are subtle. and because of all the functions you need to pass pretty nasty if you don't have groupBy query comprehension syntax.

But implementing it would be a fun task for anyone that wants to dig down to the next level of detail.

Watch http://channel9.msdn.com/Series/Rx-Workshop/Rx-Workshop-7-Reactive-Coincidence first ...

On Sep 24, 2013, at 7:32 AM, Joachim Hofer notifications@github.com wrote:

Maybe this is a Join use case? I found a description here.


Reply to this email directly or view it on GitHub.

@jmhofer
Copy link
Contributor

jmhofer commented Sep 24, 2013

This means that maybe it's a good idea to add combineWithMostRecent (as described by @samuelgruetter) as an operator? - To me it sounds like a relatively frequent use case.

@zorba128
Copy link

zorba128 commented Jan 2, 2014

I have similar problem - came up with:

public static <T, U, R> Observable<R> enrich(Observable<T> source, Observable<U> data, Func2<T, U, R> f) {
    return Observable.zip(source, data.sample(source), f);
}

but noticed sample() doesn't really sample input - it doesn't emit last value multiple times when no change occured between timer, causing zip operation to get out of sync.

Isn't that a bug (or at least serious documentation problem) with sample()?

marcin

@samuelgruetter
Copy link
Contributor Author

I agree, that's inconsistent: sample(long, TimeUnit) emits the last value multiple times if the source observable didn't emit a new value between two ticks, but sample(Observable<U>) does not repeat it, and this line suggests it was done on purpose. But I think sample(Observable<U>) should be changed to match the behavior of sample(long, TimeUnit).

@benjchristensen
Copy link
Member

How about this?

    Observable.combineLatest(a, b, { x, y -> [x, y] })
        .distinctUntilChanged({ tuple -> tuple[0]})
        .distinctUntilChanged({ tuple -> tuple[1]})
        .toBlockingObservable().forEach({ v-> println(v)})

@samuelgruetter
Copy link
Contributor Author

Regarding issue1: How to get combineWithMostRecent/enrich behavior: What if the source Observable a emits two equal values in succession? We still want to see this in the resulting Observable, so we can't use an approach with distinctUntilChanged.

Regarding issue2: Is the behavior of sample(Observable<U>) correct? @benjchristensen what do you think?

@benjchristensen
Copy link
Member

The sample operator was fixed in v0.18.2.

@samuelgruetter
Copy link
Contributor Author

Note: The sample operator was indeed fixed, but not the way @zorba128 and me would have expected. sample(Observable<U>) and sample(long, TimeUnit) are now consistent and both do not emit the last value multiple times if it hasn't changed between two sampler ticks.

[I've reread this thread because this question was asked on stackoverflow].

@benjchristensen
Copy link
Member

Would something like the following work:

observableA.combineWithMostRecentFrom(observableB)

This would different from combineLatest as it would not emit all permutations. It would only emit when A emits and take whatever the last from B was.

Then the A observable could be the "slow one" and we just grab whatever the last from B was.

@staltz
Copy link
Member

staltz commented Nov 29, 2014

Guys, guys, we need to solve this one. I've been using this "combineWithMostRecent" or "enrich" (I call it "combinePrev") many times in an Android project in production. It's very useful. Basically what we need is the asymmetric version of combineLatest. I used to have an implementation of it based on join, and I informed @mattpodwysocki that it would be good if we could do that in RxJS too, see Reactive-Extensions/RxJS#335 (comment).

My implementation of the operator with join is this:

 /**
   * Similar to combineLatest, except it joins the latestitems from the two source
   * observables only when the first observable emits an item.
   * 
   * first:  ------f----f-------------f--------------f----|>
   * second: ---s1----------s---s--s3-----s-s-s-s4--------|>
   * result: ------R1---R1------------R3-------------R4---|>
   * 
   * @param first
   *        The first source Observable
   * @param second
   *        The second source Observable
   * @param combineFunction
   *        the aggregation function used to combine the items emitted by the source
   *        Observables
   * @return an Observable that emits items that are the result of combining the items
   *         emitted by the source Observables by means of the given function
   */
  public static <TFirst, TSecond, R> Observable<R> combinePrev(
    final Observable<TFirst> first, 
    final Observable<TSecond> second, 
    final Func2<TFirst, TSecond, R> combineFunction) 
  {
    final Observable<TSecond> hotSecond = second.publish().refCount();
    return first.join(hotSecond, new Func1<TFirst, Observable<Object>>() {
      public Observable<Object> call(final TFirst it) {
        return Observable.<Object>empty();
      }
    }, new Func1<TSecond, Observable<TSecond>>() {
      public Observable<TSecond> call(final TSecond it) {
        return hotSecond;
      }
    }, combineFunction);
  }

But I just figured a much simpler implementation based only on map and switch, basically this:

A.map({a -> B.map({b -> [a, b]})}).switch()

See this StackOverflow answer I wrote.

@benjchristensen
Copy link
Member

not the way @zorba128 and me would have expected

What was expected? It emits the last item in a given time window if something was emitted. If nothing was emitted then nothing is emitted at the end of the time window.

http://reactivex.io/RxJava/javadoc/rx/Observable.html#sample(long,%20java.util.concurrent.TimeUnit)

@benjchristensen
Copy link
Member

we need to solve this one

Sure, let's get it solved.

Once #1905 is confirmed let's add a new operator marked with @Beta and make sure it works well for everyone and then in 1.1 or 1.2 we can mark it as stable and remove the @Beta marker.

@staltz
Copy link
Member

staltz commented Nov 29, 2014

So we just need to find a proper name for it. I prefer shorter names, but enrich didn't ring a bell to me. Maybe sampleCombine?

@benjchristensen
Copy link
Member

It's not quite sample though, as it's not sampling with a time interval, it really is just taking whatever the last value is, similar to BehaviorSubject, or BlockingObservable.latest().

The static Observable.combinateLatest combines all permutations of all Observables it combines.

To confirm, here we want to combine every value from one Observable with the latest or most recent of another, correct?

It feels like an instance method of combineWithMostRecent or combineWithLatest. Or is combine too confusing with the static combineLatest that does all permutations?

It ends up being very similar to zipWith if we had a zipWithLatest variant?

Does this have a proper name in Haskell, Scala or some other functional language that I'm unaware of?

@headinthebox Your input on this would be helpful.

@benjchristensen benjchristensen added this to the 1.1 milestone Nov 29, 2014
@headinthebox
Copy link
Contributor

Let me think what the shortest way to implement this is using the existing combinators; cant believe it is very long but I am jetlagged ;-)

@akarnokd
Copy link
Member

This appears to be producing the expected results:

public class CombineWhenOther {
    public static void main(String[] args) {
        PublishSubject<Integer> source = PublishSubject.create();
        BehaviorSubject<Integer> other = BehaviorSubject.create();


        source.concatMap(e -> other.take(1).map(f -> e + f))
        .subscribe(System.out::println, Throwable::printStackTrace, 
                       () -> System.out.println("Done"));

        source.onNext(1);
        other.onNext(10);
        other.onNext(20);
        other.onNext(30);
        source.onNext(2);
        source.onNext(3);
        other.onNext(40);
        source.onCompleted();
    }
}

But both sources are hot, and since we don't have multicast(), I don't know how to convert a general other Observable to BehaviorSubject with the stable API.

@staltz
Copy link
Member

staltz commented Nov 30, 2014

Sorry, with this implementation

A.map({a -> B.map({b -> [a, b]})}).switch()

I forgot to mention that B must be hot.

It's not quite sample though, as it's not sampling with a time interval, it really is just taking whatever the last value is, similar to BehaviorSubject, or BlockingObservable.latest().

It is sample as in a.sample(b) combined with b. It is not sampling with a time interval, it is sampling with b as the sampler. See this jsfiddle (c is a.sample(b)).

The static Observable.combineLatest combines all permutations of all Observables it combines.

To confirm, here we want to combine every value from one Observable with the latest or most recent of another, correct?

It feels like an instance method of combineWithMostRecent or combineWithLatest. Or is combine too confusing with the static combineLatest that does all permutations?

If we could afford renaming existing operators, one suggestion is combineSymmetric for combineLatest and combineAsymmetric for this new one. The problem with names such as combineWithMostRecentand combineWithLatest is that in English they mean basically the same as combineLatest, and a lot of confusion can emerge from this ambiguity.

Another thing to keep in mind is that this new operator is an instance method, and shouldn't have a static version. Because of the asymmetric behavior, there should be one source Observable that commands the emission of the resulting Observable.

If we take that into consideration, we could name it withLatest, since it will be always applied on some source observable a:

c = a.withLatest(b, combineFunction)

Another insight is that since the c observable emits at the same time a emits, we can take advantage of the map concept. mapWithLatest could work as a name.

My humble suggestions are then either sampleCombine or withLatest or mapWithLatest.

@samuelgruetter
Copy link
Contributor Author

Would something like the following work:

observableA.combineWithMostRecentFrom(observableB)

This would different from combineLatest as it would not emit all permutations. It would only emit when A emits and take whatever the last from B was.

That's exactly what I was looking for.

not the way @zorba128 and me would have expected

What was expected? It emits the last item in a given time window if something was emitted. If nothing was emitted then nothing is emitted at the end of the time window.

http://reactivex.io/RxJava/javadoc/rx/Observable.html#sample(long,%20java.util.concurrent.TimeUnit)

For example, if I sample an audio signal s at a frequency of 44100 Hz, I expect to get one sample every 1/44100 seconds, no matter what the shape of s is. Taking this analogy to observables, I'd expect that

myObservable.sample(50 milliseconds)

emits an element every 50 milliseconds, no matter when myObservable emits how many items. That is, I'd expect that if nothing was emitted in the time window, the last value is repeated.

But I agree that the way RxJava understands "sample" also makes sense, and it's well explained in the docs what happens, so I'm not saying we should change anything.

If we take that into consideration, we could name it withLatest, since it will be always applied on some source observable a:

c = a.withLatest(b, combineFunction)

I think withLatest would be a nice name.

@dvtomas
Copy link

dvtomas commented Dec 1, 2014

I agree this is a very useful operator, and, @staltz , I like your map - switch implementation. Just for cross-reference, I have already raised this issue (#912) some time ago.

Naming the new operator indeed is difficult now that sample is taken. Perhaps sampleEach could be considered, to express that there is a difference in behavior to sample, while still asserting that it is sampling in some sense?

@dvtomas
Copy link

dvtomas commented Dec 3, 2014

Or, what about combineSampled? I like how it is simillar to combineLatest - both do combining, both have same type signatures ((T, U) => (T, U)), they but they differ in what they combine - either latest values, or values taken at times defined by the sampler. They even sort close alphabetically, so the user can see he has a choice, what suits him best.

sampleEach could be used as an alternative to just sample, with slightly different behavior (not filtering out non-changed values), if need for it's inclusion in the library ever arises (but it can be trivially replaced by combineSampled, so it will probably not).

@staltz
Copy link
Member

staltz commented Dec 3, 2014

@dvtomas if we would use a.combineSampled(b, combineFunction), it would sound like we are combining a with b.sample(something) while in reality we are combining a.sample(b) with b.

@akarnokd
Copy link
Member

akarnokd commented Dec 3, 2014

Or a.combineWithLatestOf(b, throughAFunction).

@dvtomas
Copy link

dvtomas commented Dec 3, 2014

@staltz Sorry, can't see it. My mind is already too deeply connected with my interpretation.. Also, I work in scala, the combineFunction would probably be absent, it would be just a combineSampled b, I haven't thought of Java.

@akarnokd That sounds reasonable wrt to combineLatest being widely understood and used. I had to go through some pondering about what the Latest part in combineLatest really means first, to appreciate that...

@staltz
Copy link
Member

staltz commented Dec 3, 2014

it would be just a combineSampled b, I haven't thought of Java.

So to clarify, which of a or b do you think is being sampled by the other, in this a combineSampled b idiom? How are you "reading" this in plain english?

@dvtomas
Copy link

dvtomas commented Dec 3, 2014

@staltz I see your point now. It reads roughly as A combine with sampled B (sampled with what?). That's not right..

I like @akarnokd's combineWithLatestOf the best so far.

@staltz
Copy link
Member

staltz commented Dec 3, 2014

@dvtomas precisely, when we say "sampled B", we think "we take samples of B" which to RxJava translates to b.sample(something), which does not happen in reality. I don't want this to become a gotcha.

@staltz
Copy link
Member

staltz commented Dec 16, 2014

Now implemented in RxJS as withLatestFrom. I would make a PR in RxJava as well, but I'm having a hard time navigating through the core in the codebase, maybe someone else familiar with the codebase could implement it?

The implementation can be roughly

A.map({a -> hotB.map({b -> [a, b]})}).switch()

Or a state machine like I did in RxJS.

@staltz
Copy link
Member

staltz commented Feb 19, 2015

http://stackoverflow.com/questions/28580490/rxjava-how-to-emulate-withlatestfrom
People are asking for this operator.

Some one please implement it?

@JakeWharton
Copy link
Contributor

👍 I have had 4 separate instances of need of this in the last two weeks that I felt dirty working around!

@akarnokd
Copy link
Member

I'll do this.

@staltz
Copy link
Member

staltz commented Feb 19, 2015

👍 👍

@akarnokd
Copy link
Member

See #2760 for the proposed name and behavior.

@JakeWharton
Copy link
Contributor

This issue can be closed.

@abersnaze
Copy link
Contributor

The window operator looks like an interesting option for building this. The initial drawing looks almost exactly like the marble diagram for the operator.

window marble diagram

@abersnaze
Copy link
Contributor

Damn it, I couldn't stop thinking about this all night.

package asdf;

import static rx.Observable.zip;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public class Main {
    private static class Tuple {
        public final String t;
        public final int i;

        Tuple(String t, int i) {
            this.t = t;
            this.i = i;
        }

        @Override
        public String toString() {
            return t + ":" + i;
        }
    }

    public static void main(String[] args) {
        Subject<String, String> trigger = PublishSubject.create();
        Subject<Integer, Integer> data = PublishSubject.create();

        trigger.publish(trigger_ -> {
            return zip(trigger_, data.window(trigger_).flatMap(window -> window.lastOrDefault(-1)), Tuple::new);
        }).scan((last, curr) -> curr.i == -1 ? new Tuple(curr.t, last.i) : curr).subscribe(System.out::println);

        data.onNext(0);
        trigger.onNext("A");
        data.onNext(1);
        data.onNext(2);
        trigger.onNext("B");
        trigger.onNext("C");
        data.onNext(3);
        data.onNext(4);
        trigger.onNext("D");
    }
}

produces the output

A:0
B:2
C:2
D:4

It would simpler if you don't need the triggers value.

package asdf;

import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public class Main {
    public static void main(String[] args) {
        Subject<String, String> trigger = PublishSubject.create();
        Subject<Integer, Integer> data = PublishSubject.create();

        data.window(trigger).flatMap(window -> window.lastOrDefault(-1)).scan((last, curr) -> curr == -1 ? last : curr).subscribe(System.out::println);

        data.onNext(0);
        trigger.onNext("A");
        data.onNext(1);
        data.onNext(2);
        trigger.onNext("B");
        trigger.onNext("C");
        data.onNext(3);
        data.onNext(4);
        trigger.onNext("D");
    }
}

produces the output

0
2
2
4

@staltz
Copy link
Member

staltz commented Aug 20, 2015

@abersnaze or just

A.switchMap({a -> hotB.map({b -> [a, b]})})

@abersnaze
Copy link
Contributor

I had to change the variable names to grok it. Much better than mine.
data.switchMap(i -> trigger.map(t -> new Tuple(t, i))).subscribe(System.out::println);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants