Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak when using concatMap with asynchronous function #2342

Closed
janmarthedal opened this issue Feb 8, 2017 · 4 comments
Closed

Memory leak when using concatMap with asynchronous function #2342

janmarthedal opened this issue Feb 8, 2017 · 4 comments

Comments

@janmarthedal
Copy link

RxJS version:

Version 5.1.0.

Code to reproduce:

const Rx = require('rxjs/Rx');

const buffer_size = 1024;
const dummy_string = Array(10001).join('x');

const generator = (function() {
    const subscribers = [];
    let counter = 0;

    const observable = Rx.Observable.create(observer => {
        subscribers.push(observer);
    });

    observable.request = count => {
        setTimeout(() => {
            for (let req=count; req > 0; req--) {
                counter++;
                subscribers.forEach(s => s.next(counter));
            }
        }, 0);
    };

    return observable;
})();

function mapper_async(recs) {
    return Rx.Observable.create(observer => {
        setTimeout(() => {
            observer.next(recs.length);
            observer.complete();
        }, 1);
    });
}

generator
    .map(v => dummy_string + v)
    .bufferCount(buffer_size)
    .concatMap(mapper_async)
    .subscribe(v => {
        console.log(v);
        generator.request(buffer_size);
    });

generator.request(buffer_size);

Expected behavior:

Memory usage stays bounded.

Actual behavior:

Out of memory (after some time).

Additional information:

If using

function mapper(recs) {
    return Rx.Observable.create(observer => {
        observer.next(recs.length);
        observer.complete();
    });
}

as the mapping function instead of mapper_async, the problem does not occur.

If running the equivalent code using https://github.com/Reactive-Extensions/RxJS (v4.1.0) the memory does not blow up.

Run on node.js version 6.9.4.

@benlesh
Copy link
Member

benlesh commented Feb 10, 2017

Honestly, I'm not entirely sure what the intended goal of the example code is, but concatMap has an unbounded buffer internally, and for every time you're calling that monkey-patched request method, you're nexting 1024 values, which in turn calls request 1024 times, nexting 1024 * 1014 values... recursively. So each pass P through your flooding concatMap's internal buffer with 1024 ^ P more items.

The equivalent code does not "blow up" in RxJS 4, because it defaults to Queue scheduling, which runs everything breadth first. (non-recursively). So the concatMap buffer gets a chance to flush at each pass. You can do the same thing with Rx 5 by using the QueueScheduler (Rx.Scheduler.queue), but we don't default to it for performance reasons. What you're doing here isn't a good pattern in terms of concatMap.

@benlesh
Copy link
Member

benlesh commented Feb 10, 2017

I'm going to close this as not an issue for now. But if you find it's still an issue using the queue scheduler, please reopen.

Thanks for reporting this and caring about the community and the library, @janmarthedal!!

@benlesh benlesh closed this as completed Feb 10, 2017
@janmarthedal
Copy link
Author

Fixed with release 5.1.1 (probably due to the fix of #2360).

@lock
Copy link

lock bot commented Jun 6, 2018

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 6, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants