fix: async CAR put() & close() for backpressure & error handling #74
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Ref: #73 /cc @jimpick
Bear with me ... This is because of the challenges of using asynciterables as streams, but we can do this!
In the current form, the promises for
writer.put()
andwriter.close()
get lost, so (a) we have no means of backpressure so if we had a fast block store + a large number of blocks and a slow reader for the output then we'd use up a lot of memory, and (b) any errors are lost to the wind.The CAR internals for
{writer,out}
("IteratorChannel") are careful to maintain backpressure between them if you choose to useawait
on yourwriter.put()
(because it's not strictly necessary), so the approach in this PR should maintain a steady stream state with not excessive bytes accumulating if you have a slow writer on the other end (it would be interesting to test this assertion, we're getting into complicated territory here).So, we set up 2 parallel activities: (1) writing blocks from the blockstore to the CAR writer and (2) reading blocks from the CAR output channel. We want to give the user the results of (2) and we don't want (1) to start until the user starts reading and we don't want blocks to be written until the bytes for previous blocks have been consumed. CAR internals take care of that latter concern as long as we can
await
on ourwriter.put()
calls and waiting until the user calls[Symbol.asyncIterator]()
means we can defer the blockstore read->CAR write until they start iterating. We also get to hang on to the promise where we do all the writing and byawait
ing that at the last we get to propagate errors.