-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a buffering helper #4
Comments
I guess you could make a point for both. On the one hand iterators are still pull-based, on the other hand buffering ahead means starting more operations than were requested.
Yes, I think having it as both the upper and lower bound should be the default. There should be other methods (or extra parameters) for setting only the upper or only the lower bound, but I think it's most expected to set both. for await (const el of asyncIter
.map(takes20ms).buffer(2)
.map(takes10ms).buffer(1)
.map(takes30ms).buffer(3)
) {
console.log(el);
} That should ideally call the three functions at a rate of 1 per 10ms each. Having to write that as Always setting a limit would also help to maintain a nice and appropriate backpressure. If you create an iterator with a |
Well, that depends on whether you're thinking of I think the way I'd want to write your example, where I wanted to limit the degree of concurrency of the I've thought for a while that we needed a mechanism for limiting concurrency in the standard library (I've mentioned that in #1 as well). If we did have such a thing, so you could write asyncIter
.map(limitConcurrency(2, takes20ms))
.map(limitConcurrency(1, takes10ms))
.map(takes30ms)
.buffer(3) would you still think that |
Well there's multiple things involved. I think ultimately iterators are pull-based, so what we can limit is only the number of concurrent Now there might be a way to run an async iterator faster than it will be consumed, which leads to buffering responses up to a certain (possibly unbounded) limit. Unfortunately, there is no way to tell how fast an async iterator can produce results if running steps concurrently. Is Tbh after pondering a bit, I'm not even certain that concurrent calls to
That wouldn't be the same. Consider asyncIter
.map(tapper(console.log))
.map(takes10ms).buffer(1)
.map(takes30ms).buffer(3) vs asyncIter
.map(tapper(console.log))
.map(limitConcurrency(1, takes10ms))
.map(takes30ms).buffer(3) The second will immediately log 3 times, then buffer the inputs for |
I'm not sure what this means. An iterator can't produce values which haven't been requested by the consumer - there is, structurally, no mechanism to do so. The
My experience of using helpers with those signatures has not been that they are simpler and easier to understand. And they don't do anything for the case where you are writing your own iterable which is capable of producing values concurrently - it means every iterable needs to take a "how many values to buffer" parameter, instead of having concurrency naturally specified based on how many items the consumer is requesting, which is more consistent with being pull-based.
I agree it wouldn't be the same, but it seemed like limiting concurrency of |
Yeah, sorry, I was trying to be deliberately vague since I wanted to refer to all approaches - passing a concurrency value to the function producing the iterator, using your proposed
Yeah, I can see the appeal in that. But I guess I'm uneasy about how this composes. Let's say I have I have function sampleIter() {
return someAsyncIterator().map(x => …).map(y => …);
} The mapping functions may inadvertently rely on not being invoked concurrently. Now if someone write
My goal would be to stream data from/through async iterators, where all the asynchronous processing of the streaming stages runs concurrently - i.e. have each stage be sequential by itself, but have all stages be active at the same time. Taking asyncIter.map(takes20ms).map(takes10ms).map(takes30ms) as an example again, this by default pulls one item from the iterator, takes 60ms to process it, then takes the next, etc. No two pieces of code run concurrently, which is quite a waste if the three callbacks do use different resources and do not contend for them. asyncIter.map(takes20ms, {buffer: 1}).map(takes10ms, {buffer: 1}).map(takes30ms, {buffer: 1}) should then take an item from the iterator, take 20 ms to process it at the first stage, then already pull the next and process it as well, while the result of the first is being processed at the second stage etc. This would lead to the last stage pulling (and getting) new items every 30 ms. I guess the same can be achieved using your approach asyncIter.map(takes20ms).map(takes10ms).map(takes30ms).bufferAhead(1) (although that also affects asyncIter.map(takes20ms, {buffer: 2}).map(takes10ms, {buffer: 1}).map(takes30ms, {buffer: 3}) that details a concurrency factor for each stage separately. |
I agree that in this situation you shouldn't invoke the mapping functions concurrently, but I don't think we should assume this is the expected case. We should instead give users the ability to limit concurrency when they have functions which shouldn't be invoked concurrently. I am maybe convinced of the need to have an explicit method to limit concurrency on an async iterator, though, for the specific case that you want to perform subsequent concurrent operations on top of an async iterator vended by someone else which breaks if you try to consume it concurrently.
How is that better than asyncIter.map(takes20ms).map(takes10ms).map(takes30ms).bufferAhead(6) ? The latter will start processing items a little bit earlier, but that's arguably desirable, particularly if the times have some noise to them. The |
Hello. I'd like to point out that Consider: import { asyncBuffer, asyncMap } from "iter-tools";
async function* make() {
for (let ii = 0;;) {
const value = ii++;
console.log(`generated ${value}`)
yield value;
}
}
function consume(value: number) {
console.log(`consumed ${value}`);
return value;
}
function throwIfGreater(operand: number) {
return (value: number) => {
if (value > operand) {
throw new Error(`${value} is greater than ${operand}`);
}
return value;
}
}
const iter = asyncBuffer(4, make());
const work = asyncMap(consume, iter);
const release = asyncMap(throwIfGreater(2), work);
for await (const result of release) {
console.log(result);
} The output follows:
In this case the results from Expressed another way, this operation accepts ownership of a An alternative parallelization primitive that I have been using internally is function divide<Type>(
iterable: AsyncIterable<Type>, count: number
): AsyncIterable<Type>[] {
let current: Promise<IteratorResult<Type>> | undefined;
let remaining = count;
const iterator = iterable[Symbol.asyncIterator]();
const iterables = [ ...Array(count) ].map(async function*() {
try {
while (true) {
current = async function() {
const previous = await current;
if (previous?.done) {
return previous;
}
return iterator.next();
}();
const result = await current;
if (result.done) {
break;
} else {
yield result.value;
}
}
} finally {
if (--remaining === 0) {
await iterator.return?.();
}
}
});
return [ ...iterables ];
} We can now rewrite the parallelized operation as: const iter = divide(make(), 4);
await Promise.all(iter.map(async function(iter) {
const work = asyncMap(consume, iter);
const release = asyncMap(throwIfGreater(2), work);
for await (const result of release) {
console.log(result);
}
})); And the result:
Note that all generated values were consumed; no result has been abandoned. |
@laverdet I'm not sure what you mean by "unsafe" or "ownership" here. I think it's expected when buffering that results are produced in advance of being consumed, such that if you stop consuming before the end of the stream you may have produced more results than are ever consumed. That's what buffering means. If you don't want that behavior you can't buffer, but that doesn't make the utility "unsafe"; it just makes it unsuitable for some applications. In any case, the existing behavior of |
That said, I do think the |
Well, I think calling .next() concurrently is a mistake and I would hate to see it enshrined in the language specification. It's possible I'm too late to the discussion to make that case though. If an iterable truly supports concurrent execution I would prefer to just have that iterable yield something like The problem I have with concurrency implemented in this way is that it actively encourages abandoned values. Abandoned values can result in memory leaks or lost work. In my case when I authored I can think of other examples:
This is why I consider |
I agree that using these APIs in situations where you need to dispose of values explicitly would be bad. But I am reluctant to optimize for that (in my experience) quite rare case, given how much worse the API gets when you add the restriction that you can't pull concurrently. And again, I don't think it's any worse than That said, there's possible tweaks here which could help - for example,
Support for calling
The box ( (
This is a reason that it is important for your use case to consume all the values, but you can call |
Well a big difference between In my experience async iterables are very fussy beasts, and getting them right can be challenging. I see concurrent I actually use a version of A example of where I'd use A example of where I'd use Here the lookAhead & divide strategy still achieves tunable parallelization + buffering, but doesn't need to invoke next() concurrently. I disagree with your statement:
In this example [paged document manifest -> document fetch] each stage must be tuned specifically. It is not enough to tune the pipeline parallelization at the end because otherwise you will still run into a bottleneck when iterating to a new manifest page. We really do want to specify buffer and parallelization strategies separately for each stage. I believe this will be the case for many non-trivial pipelines. For example, it is ok to fetch 10 images at a time but we probably only want to transcode 2 images at a time. Anyway concurrency is messy, opinionated, and workload specific. It doesn't seem like the community has experimented too much with any of these patterns, so this part of the proposal seems overly ambitious. I also believe that the failure of the async cancellation proposal from a few years ago kneecapped this proposal from the start. What do we do in this example? declare const documents: URL[];
const result = documents.toAsync()
.map(url => fetch(url))
.map(response => response.text())
.bufferAhead(10)
.some(text => text.includes("marker")); We want to determine if any of these documents contain the text "marker". We want to fetch 10 documents at a time, and we want to cancel any pending fetches when the marker is found in any of them. The With this case in mind I see this proposal as a minor quality of life improvement for some basic cases and one-off scripts. For any kind of work in long-running scripts or in the browser I would still want to use a userspace implementation to leverage |
In practice, in my experience, this doesn't happen. So it is hard for me to accept that as a real reason to dislike concurrent
That's true, but only because there's practically nothing in the language or any other web specification that deals with async iterables at all. This proposal is when we're building out the very first nontrivial uses and combinators, so of course now is the first time this question is arising.
It gets you the ability to specify an async iterator which can be consumed concurrently, at the consumer's option. I am open to other ways of accomplishing this but have not yet heard one.
Yes, I agree that I'm not seeing where you get the "want to specify buffer and parallelization strategies separately for each stage" part from, though. Why is buffering in this case problematic? Are you fetching documents so large they don't fit in memory? I agree this utility is not suitable for that case and am OK with that.
It's true that sometimes when creating an async pull source you want to limit the degree of concurrency that consumers will be able to cause by pulling. But this is just as true with regular async functions (which are also pull sources) as for async iterators: for example, if you are making a "get a thing from this API" function, you probably want to limit how many times that function can be called simultaneously. I've long thought we needed a utility for this (like this popular userland one). But limiting the degree of concurrency is an entirely different thing than specifying the degree of concurrency. If the consumer of the iterator is working very slowly, they might not need to bother with concurrent transcodes, for example - they could just The limit on concurrency for each stage is a property of the stage, I agree, but the actual realized amount of concurrency appropriate to each stage depends on the other stages, including the ultimate consumer. So I stand by my statement that you really should be able to worry only about the degree of concurrency appropriate to the entire pipeline, at least in most common cases. I think the example with the
It will not be possible to experiment with these patterns if we specify
You should abort when you're done, of course? That gets easier with a disposable AbortController, which is coming along, so I'll use that in this example: using controller = new AbortController.AutoAbort();
const result = documents.toAsync()
.map(url => fetch(url, { signal: controller.signal }))
.map(response => response.text())
.bufferAhead(10)
.some(text => text.includes("marker"));
controller.abort(); Alternatively, if we have a "run this callback when const controller = new AbortController();
const result = documents.toAsync()
.map(url => fetch(url, { signal: controller.signal }))
.map(response => response.text())
.onClose(() => controller.abort())
.bufferAhead(10)
.some(text => text.includes("marker")); I'm not seeing any difficulty here. This is no more a problem than the common Yes, of course there's no way for In any case, it's certainly not a problem if you want to use some other implementation for your particular needs. We're never going to design an API which is both fairly simple and suitable for every use case. "Fairly simple" is a hard constraint, so as long as the design here is suitable for most common cases - which I believe it is - then I'm satisfied. I am open to alternative designs but everything I've seen thus far seems worse. |
My belief is that the consumer alone doesn't have the information needed to specify concurrency for a given pipeline. Each stage will have unique considerations to take into account. So instead of dropping a
Well, it's possible to produce the results back into another async iterable when that kind of thing is needed. The technique to accomplish this isn't glamorous though.
Yes, basically! I've done some amazing things with async generators and having a high degree of control over resource ownership has been crucial in that work.
Anyway, the approach to concurrency that this proposal takes implicitly changes the contract of async iterables from "SHOULD handle concurrent invocations of Thanks for taking the time to defend your work here. I look forward to seeing how it develops! |
The whole point of pull sources is that backpressure is handled by the consumer driving requests. When you specify it at the producer, you have to buffer, since there's nowhere to put those results. Which is fine in some cases, but as you yourself observe, sometimes you don't want a buffer. (Nothing in this proposal would force you to use
This is a bit off-topic, but I disagree - when there's multiple consumers, as is often the case (particularly for things like "hit this API"), it's not practical to throttle them independently. The appropriate place to put the limiter is the place which necessitates it, not to force communication among all consumers of that thing.
Eh, I don't think it amounts to a change in contract. Compare again the example of async functions - some of them can be called concurrently, others can't. It's possible to use them in such a way that they'll get called concurrently, but that's true for async iterators today as well. It is fine for an async function to have as part of its contract "cannot be called concurrently", just as it is fine for an async iterator to have as part of its contract "cannot have its
Yes, thanks for voicing your concerns as well. I'm putting together a presentation for committee about the various tradeoffs here and I'll mention the point you raise about values getting lost. |
This would be a great & simple capability for adding in concurrency! I've seen a number of folks struggle with this at my day jobs, and having something in the language would be a huge help to make async iterators really useful out of the box.
I really don't want to get into a position where I do kind of want an eager mode. One could also imagine a
Again I think we've happened on another interesting capability (one we probably want even more than But users also want to limit concurrency too. A |
Yeah, my current thinking is that if we want this functionality it should be opt in via a parameter to |
I'm sorry if I missed this, because it's a big thread. There's really three ways I see people buffer async things:
The next question becomes whether you want the emission of unfinished buffers on complete to be configurable. For example if you're buffering 3 items, but the source stream ends before you get to 3, do you emit a buffer with two in it? Or do you just drop them? Or do you error? Those are semantic issues. Regardless, buffering for a specific amount of time is the most often used in RxJS, which will probably have similar use cases to this. |
@benlesh I don't think "buffering for an amount of time" means anything with regard to async iterators. Async iterators don't do anything until they're pulled from. The "buffer for a number of things" style helper that we've been discussing in this thread would have a fixed number of slots, immediately pull from the source enough times to fill all of the slots, and as they empty (as the result iterator is pulled), continue to pull from the source to keep those slots full. What would it mean to pull from the source async iterator for some amount of time? Would this have no parallelism and just pull from the underlying iterator in series until some timer expires and yield a sequence type like an array of the collected values? If that's the thing you're talking about, it seems only very loosely related to what's being proposed here (though arguably a more appropriate use of the term "buffering"). If so, and you think that's a useful helper, we should probably have a separate issue to track that. edit: Now that I think about it, that sounds like chunking but with chunk size based on number of yielded values over a given time. |
Has anyone attempted to build a ponyfill/userland implementation/prototype of this yet? It might be useful to start experimenting with using one and get some feedback on the design |
@felixfbecker is there spec text for anything yet? |
@felixfbecker I'm working on one, just haven't had time to polish and ship. @ljharb No. Spec text is a huge amount of work to do for something for which the design isn't settled yet. |
I totally agree, I just mean that an implementation prior to spec text seems premature to me. |
I think it's pretty normal to make prototype implementations to experiment with the design? That's explicitly part of the stage 2 process, even. Having a prototype to play around with is helpful in my experience. |
Here are some functions I wrote, in case they are helpful starting points. The docstrings reflect my understanding, so if I got something wrong/misleading, would appreciate any corrections! I'm sure there are edge cases not handled, too. While I was working on this I was also learning more about streams, and ultimately came to the conclusion that I probably would have been better off expressing my workload not as a sequence of applications of iterable transformers, but actually as a pipe chain of streams. IMO it'd be a great contribution to the ecosystem if someone could write up the differences between AsyncIterables and Streams, and when one might want to reach for one vs. the other. That was something I struggled with during this exercise. // Applies `mapper` to values yielded by `iter`. Note: Unlike an async generator
// implementation of `map`, this implementation allows multiple values to make
// progress through mapper concurrently--so long as multiple calls to `next` are
// made. For that, see `queueMany`, also in this module.
//
// Polyfill for `map` from
// https://github.com/tc39/proposal-async-iterator-helpers.
export function map<T, V>(iter: AsyncIterable<T>, mapper: (val: T, i: number) => V): AsyncIterable<Awaited<V>> {
let i = -1;
let iterator = iter[Symbol.asyncIterator]();
const result: AsyncIterableIterator<Awaited<V>> = {
async next() {
const innerResult = await iterator.next();
if (innerResult.done) {
return {
done: true,
value: innerResult.value,
}
}
i += 1;
const value = await mapper(innerResult.value, i);
return {
done: false,
value,
}
},
[Symbol.asyncIterator]() {
return this;
},
};
return result;
}
/**
* Eagerly consumes `iter`. That is, as long as `iter` has not reported that
* it's done there will always be `concurrency` many pending requests for its
* next values.
*
* Wrapping an iterable with `consume` is comparable to piping a ReadableStream
* into an [identity transform
* stream](https://streams.spec.whatwg.org/#identity-transform-stream) whose
* writable side has a `highWaterMark` equal to `Infinity`. That is, `consume`
* wraps `iter` with another iterable who maintains an internal queue, which it
* eagerly populates with values pulled from `iter`.
*
* In runtimes with good support for streams, this iterator adapter is probably
* unnecessary and streams should be preferred across the board, due to their
* eagerness.
*/
export function queueMany<T>(iter: AsyncIterable<T>, concurrency: number): AsyncIterable<T> {
const bufferedInnerResults: ReturnType<AsyncIterator<T>['next']>[] = [];
let innerDone = false;
const iterator = iter[Symbol.asyncIterator]();
function requestAnother() {
// Any time inner provides another value, we request another if inner is
// not yet done. This maintains the concurrency level.
bufferedInnerResults.push(iterator.next().then((result) => {
if (result.done || innerDone) {
innerDone = true;
} else {
requestAnother();
}
return result;
}));
}
while (bufferedInnerResults.length < concurrency) {
requestAnother();
}
const result: AsyncIterableIterator<T> = {
next() {
if (bufferedInnerResults.length === 0) {
requestAnother();
}
// SAFETY: The branch above ensures `bufferedInnerResults` is
// non-empty.
return bufferedInnerResults.shift()!;
},
[Symbol.asyncIterator]() {
return this;
},
};
return result;
} In my specific case I wanted to ensure that my long-running async All that said, though, I was still left thinking this was a clumsy approach, and that I probably wanted to use Streams instead. I'm not 100% sure, though. |
Whoops, I think I'd misunderstood above how the specified
So my |
@bakkot What are your plans for the first parameter? Are you going to try to incorporate the Governor interface we discussed or just accept an integer for now and have that be shorthand for a Semaphore of that capacity in the future? |
Just an integer for now, yup. |
For the concurrency in the other helpers to be useful, you have to call
.next()
multiple times and buffer the results as they come in.We should have a helper which does that for you, giving you an async generator which reads from the buffer and keeps the buffer full. I'm tentatively calling it
bufferAhead
but open to other names.That would let you do stuff like
And even without using helpers like
map
you could still make some use of it with raw async generators and other async iterables when the processing of the data you get from the iterable is async, as inI note that such a helper exists in the userland iter-tools library, spelled
asyncBuffer
.Open questions:
.next()
and gets an item which hasn't yet settled, should that trigger another pull from the underlying iterator, or should the parameter tobufferAhead
serve as an upper bound on the degree of concurrency to request from the underlying iterator (as well as the lower bound it obviously is)? (I lean towards "it should let you pull more things if you explicitly ask for them"; we should have other mechanisms for limiting concurrency.)The text was updated successfully, but these errors were encountered: