Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

combine on synchronous sources from array #414

Closed
staltz opened this issue Feb 24, 2017 · 9 comments
Closed

combine on synchronous sources from array #414

staltz opened this issue Feb 24, 2017 · 9 comments

Comments

@staltz
Copy link
Contributor

staltz commented Feb 24, 2017

Given this code

const a$ = most.from([1,2])
const b$ = most.from([10]);

const c$ = most.combine((a,b)=>a+b, a$, b$);

c$.subscribe({next: c => console.log(c)});

The log shows

12

While most people would expect it to display

11
12

By the way, I'm raising these issues also for xstream staltz/xstream#173 and RxJS ReactiveX/rxjs#2414. Is this is a bug or a feature? I'm not sure. What are people's ideas and opinions on this case?

@TylorS
Copy link
Collaborator

TylorS commented Feb 24, 2017

I think this is a side-effect of how most stream libraries do subscribe to each stream in its given order.

Most.js delays emissions until source.run() (xstream Producer.start() equivalent) returns, but after that all events are generally synchronous unless asynchrony is introduced via delay() or other combinators. So in this example, most.from() schedules a task that upon being run synchronously emits all the values of the array as fast as it can.

I'm not sure it's totally worth fixing IMO, as I don't actually see sources like most.from() to be "true" event streams.

Thanks for the issue. I'm sure it will lead to an interesting discussion.

@TylorS
Copy link
Collaborator

TylorS commented Feb 24, 2017

Here's a quick copy/paste of the OPs example in an esnextbin for those interested. https://esnextb.in/?gist=4a35680f3005d6267c37ad40a000b805

@staltz
Copy link
Contributor Author

staltz commented Feb 24, 2017

Yeah, I also see heavy use of multiple synchronous streams as pushing the limits of the paradigm too much. It becomes the wrong tool for the job (where e.g. arrays could do better). That said, I'm mostly worried if this issue expresses itself in a more subtle use case, because of course the example given above is the contrived and small.

@TylorS
Copy link
Collaborator

TylorS commented Feb 24, 2017

I think there are basically 4 options for these issues (across each library) after some discussion with @briancavalier.

  1. "Solve" the confusion via documentation. Ensure it is clearly marked that synchronously emitting "events" are not just synchronous but simultaneous, and that combine always observes the last (as in order not time) simultaneous event.
  2. Do lots of buffering (large performance impact)
  3. Remove these APIs that don't produce "true" events. Instead provide a kind of overlay for arrays of values that operate on top of streams that do produce predictable and sane timing behaviors.
  4. Asynchronously emit each value one after another, which also feels very unexpected just like the current behavior.

@TylorS
Copy link
Collaborator

TylorS commented Feb 24, 2017

Regarding number 3... it could be something like this

import { Stream } from 'most';

export function mapArray(array, stream) {
  return new Stream(new MapArray(array, stream.source))
}

class MapArray {
  constructor(array, source) {
    this.array = array;
    this.source = source;
  }

  run (sink, scheduler) {
    return this.source.run(new MapArraySink(this.array, sink), scheduler);
  }
}

class MapArraySink {
  constructor(array, sink) {
    this.array = array;
    this.length = array.length;
    this.index = 0;
    this.sink = sink;
  }

  event(time) {
    const value = this.array[this.index];
    
    this.sink.event(time, value);

    if (++this.index > this.length)
      this.sink.end(time, value);
  }

  error (time, err) { this.sink.error(time, err); }

  end (time, value) { this.sink.end(time, value)  }
}

// possible usage

const stream = mapArray([1, 2, 3], periodic(100));

observe(console.log, stream)

This would give the behavior of an array of values a consistent and expected time component.

@trxcllnt
Copy link

While zip is generally applicable to lists, combineLatest is undefined without the time dimension. The current behavior is fine, because the solution is to introduce your own notion of time. Rx generally shies away from defining one for you.

@briancavalier
Copy link
Member

I'm ok with saying that the current behavior is correct for 1.x. Like both @trxcllnt and @TylorS have pointed out, values from an array (or other synchronously available data structure) are not really events, they're simply data, and as such they have no time dimension. There is an implicit time imposed on them based on the instant of observation, but it's artificial. They're simultaneous, but indexed.

For now, I think we can say that when given a bunch of simultaneous indexed values, combine() observes the last ("last" as in index order) one.

What I don't like is that the API allows a user to do get into this situation in the first place. The only way to prevent it currently is to tell users not to do it. Let's think about ways we can prevent it in the future. The best solution may turn out to be @TylorS's option 3.

@PEZO19
Copy link

PEZO19 commented May 9, 2022

@staltz @TylorS @trxcllnt @briancavalier Sorry for poking you all with that in this already closed issue from 5 years ago, but I have no idea where to raise such a question, please let me know if you are aware of a better platform for that. I am pretty late in the game and trying to catch up what has happened and what is happening now in the reactive/js world.

As I understand libraries trying to establish a "reactive" toolset quite "doubled down" on only using next / error / complete "event types" in a Stream. I understand that something minimal is preferable, to me it seems a sweet spot as well, but I am a bit puzzled why/how I did not encounter with attempts which would introduce some "extra" event types which - to me it seems like - would solve some of the problems.

Related to this issue and maybe some similar problems with arrays in streams as well, I am wondering about a 4th somewhat meta-type like boundary. This "boundary"(/"array") type would be some kind of an "extension" to next, but they shall work together hand-in-hand altering the semantics of the current next type.
In boundary type there would be 2 subtypes: boundary-start ([) and boundary-end (]). You could have multiple of these, but you'd have to make sure there are the same amounts of ends as starts and other things (which you expect when parsing a regular array/json...).

That way the meaning of these below would be less ambiguous I think:

coupled$:    ----[1-2]---|
separate-1$: ----[1]-[2]---|
separate-2$: ----1-2---|    # we could also say that a `next` typed event without any `boundary` fluff is to be automatically interpreted the same way as 1 line above

I assume many other array related problems could be solved easier - and I just don't see the real drawback here, at least for such attempts.

Was there such an attempt to build something on top of that? (Even if most/xstream/rx etc. are already built in a way that this is impossible.) Something which goes beyond next/error/complete?

Sorry for being long and maybe using a bad platform, please let me know how/where you think to study these deeper.

@trxcllnt
Copy link

trxcllnt commented May 9, 2022

@PEZO19 The Observer defines next, error, and complete, because Observable is the dual of Iterable (and Observer is the dual of Iterator). There are some good links on https://reactivex.io/tutorials.html describing this, especially Erik Meijer's articles and talks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants