-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Examples from contributor days #2324
Comments
This is fantastic! |
Thanks for adding this as a checklist, I've been hoping to come back to a few of these with some narrative style docs. 😄 |
This looks like a nice place to start contributing. When i started with Rx js most available examples were for Rx 4 (still are ?) and i kept looking up the migration guide. This would make things a lot simpler for people to pick up rxjs |
Feel free to pick a topic and start focusing on the content. Could be a blog post or some new markdown to incorporate here in some narrative style docs. |
I did start writing a blog , but have been really busy to continue it, should get back to it soon. Thanks for reminding :) |
@ladyleet I don't want to lose track of this for the docs team... can you please bring this to their attention? I'll ping them too |
Any progress on documenting these topics? |
So we want to start with some kind of recipes, but there is no progress at the moment as far as I know. But I would really appreciate any support on this topic :) |
Well, I plan to write an article with scheduler-recursion-reentrant SO examples but I think in a month. I will send link here then.
I create playground for this https://codepen.io/kievsash/pen/QZONpW If someone will add more interesting real-world issues with improper usage of Schedulers because of recursive calls - it would be great |
Just created a doc (article) about queue vs null scheduler difference. https://medium.com/@alexanderposhtaruk/so-how-does-rx-js-queuescheduler-actually-work-188c1b46526e. |
@benlesh The previous discussion on back pressure story has been locked. #71 so I try to start the discussion on one of the examples above here. I've created my own custom back pressure operator which is based on existing RxJS operators. And I would like to have an opinion from the RxJS community on my implementation and compare to other approaches out there.
import {generate, merge, Observable, OperatorFunction, Subject, zip} from "rxjs";
import {finalize, concatMap} from "rxjs/operators";
/**
* @param consumer Observable of values to process by the consumer
* @param bufferSize the size of concurrent observables
*/
export const backpressure = <T, R>(consumer: (value: T, index: number) => Observable<R>, bufferSize: number = 10): OperatorFunction<T, R> => {
return (source$: Observable<T>): Observable<R> => {
// subject for notifying / pulling for the next value from the source observable
const trigger$ = new Subject();
// first initial notifications
// this will create n "working" observables
const buffer$ = generate(0, (n) => n < bufferSize, (i) => i + 1);
// notifiers stream to pull next value
const notifiers$ = merge(trigger$, buffer$);
// each time some values are ready for process or the notifiers observable emits,
// the next value from the source is taken an pumped through the consumer observable
return zip(source$, notifiers$).pipe(
concatMap(([value], index) => consumer(value, index).pipe(
// trigger next event to process next value as soon one of the consumer observables has completed
finalize(() => trigger$.next())
))
);
};
};
import {from, of, range} from "rxjs";
import {bufferCount, delay, map, mergeMap, tap, toArray} from "rxjs/operators";
import {backpressure} from "./backpressure";
function random(max = 1000) {
return Math.floor(Math.random() * Math.floor(max));
}
const total = 30;
const bufferSize = 5;
const batchSize = 10;
const timeout = 5000;
const numbers$ = range(1, total).pipe(
backpressure((value, index) => of(value).pipe(
// simulate work
delay(random(timeout))
), bufferSize)
);
numbers$.subscribe((result) => console.log(result));
// apply backpressure to each buffer group independently
const batches$ = range(1, total).pipe(
// split values by buffer size
bufferCount(batchSize),
tap((batch) => console.log("process batch", batch)),
mergeMap((batch) => from(batch).pipe(
// apply backpressure to each batch
backpressure((value, index) => of(value).pipe(
// simulate work
delay(random(timeout))
), bufferSize),
toArray()
)),
map(result => result.sort((a, b) => a - b))
);
batches$.subscribe((batch) => console.log("finish batch", batch)); |
In the example above if source produces many values - 'zip' should buffer it. In the case of intensive source emissions - it can exhaust memory resources. But of course, it cannot be solved if we do not suppress source emission ability. |
@kievsash I thought about that aspect. The user of the operator is still able to throttle the source stream or apply any buffering strategy on the input source. |
@olegomon Interesting topic to discuss btw. 2 possibilities:
|
Bump. This issue makes me sad every time I look at it. There's so much to do, and not enough help. 😿 |
I would like to contribute to the Request / Reply pattern on websockets topic. I made an example of an async request response method here: Should I:
|
At RxJS contributor days we discussed adding an examples/ folder to the repo and populating it with some real use cases for RxJS. Here are the examples we came up with:
The text was updated successfully, but these errors were encountered: