Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bufferTime minBufferSize #2601

Open
dooreelko opened this issue May 11, 2017 · 6 comments
Open

bufferTime minBufferSize #2601

dooreelko opened this issue May 11, 2017 · 6 comments

Comments

@dooreelko
Copy link

Hello.
I think it can be also very helpful to have a minBufferSize parameter that will prevent the buffer from emitting if there are not enough source values to emit so that the pipeline will not need to check if there's actually something to process. Currently an empty array is emitted.

To emulate current behaviour the default is 0.
A value of X will mean "emit after delay only if buffer's has at least X elements, otherwise emit immediately after one is available".

What do you think?

@msc117
Copy link

msc117 commented May 27, 2017

Running into the same issue. I have an event stream where I want to catch a timed buffer after the first emitted value rather than endless empty buffers being emitted.

Edit: I was able to do this with bufferToggle
@Durilka https://github.com/btroncone/learn-rxjs/blob/master/operators/transformation/buffertoggle.md
The tricky part was figuring out to use throttleTime on the openings Observable

      this.source.bufferToggle(this.source.throttleTime(200), () => Observable.timer(200))
         .subscribe(buffered => {
            ...
         });

@zlepper
Copy link

zlepper commented Aug 28, 2017

This just bit me in the butt. I had expected buffer to only emit when there actually was new values (Like normal observables), and not whenever the timer ticked. But a minBufferSize could definitely fix that.

@NathanBraslavski
Copy link

NathanBraslavski commented Mar 12, 2019

another simple fix would be to filter empty events:

.pipe(
    bufferTime(200),
    filter(buffer => buffer.length > 0),
    ...
)

But I agree that it should be supported by the library in a more efficient way

@benlesh benlesh added AGENDA ITEM Flagged for discussion at core team meetings and removed type: discussion AGENDA ITEM Flagged for discussion at core team meetings labels Feb 20, 2021
@toverux
Copy link

toverux commented Jun 6, 2022

Posting this answer since it can help other people (like me) that found this issue in their seek for the right operator.
In my use case this seems to do the job well:

events$.pipe(buffer(events$.pipe(auditTime(DELAY)))

(You may want to share() your events$ observable before since it's subscribed twice here)

This will wait for events.
When an event arrives, it will start buffering for the next DELAY milliseconds.
When that period is over the batch of events is emitted as an array, and we start over.

You can replace auditTime with an operator that better suits your needs.

@nick-bailey-uk
Copy link

another simple fix would be to filter empty events:

.pipe(
    bufferTime(200),
    filter(buffer => buffer.length > 0),
    ...
)

But I agree that it should be supported by the library in a more efficient way

Filtering empty events is simply 'hiding' the problem in the first place. I noticed CPU utilization of about 10% on a page with 4 controls using bufferTime(250), when it should have been near zero - it's clearly incorrect to continually spit out empty values, when the very documentation states 'Collects values from the past as an array, and emits those arrays periodically in time.' - if there's nothing to collect, then it shouldn't emit anything.

@ArunKumarBharathi
Copy link

Posting this answer since it can help other people (like me) that found this issue in their seek for the right operator. In my use case this seems to do the job well:

events$.pipe(buffer(events$.pipe(auditTime(DELAY)))

(You may want to share() your events$ observable before since it's subscribed twice here)

This will wait for events. When an event arrives, it will start buffering for the next DELAY milliseconds. When that period is over the batch of events is emitted as an array, and we start over.

You can replace auditTime with an operator that better suits your needs.

How to set the rate limit along with this like using take operator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants