From 504e535f0cb16ed92b54783bd522654931b3ebb5 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Sat, 7 Aug 2021 14:06:22 +1000 Subject: [PATCH] fix: async CAR put() & close() for backpressure & error handling --- src/pack/index.ts | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/src/pack/index.ts b/src/pack/index.ts index 2a8d3db..6025f50 100644 --- a/src/pack/index.ts +++ b/src/pack/index.ts @@ -45,17 +45,39 @@ export async function pack ({ input, blockstore: userBlockstore, hasher, maxChun } const root = rootEntry.cid + const { writer, out: carOut } = await CarWriter.create([root]) + const carOutIter = carOut[Symbol.asyncIterator]() - const { writer, out } = await CarWriter.create([root]) - - for await (const block of blockstore.blocks()) { - writer.put(block) + let writingPromise: Promise + const writeAll = async () => { + for await (const block of blockstore.blocks()) { + // `await` will block until all bytes in `carOut` are consumed by the user + // so we have backpressure here + await writer.put(block) + } + await writer.close() + if (!userBlockstore) { + await blockstore.close() + } } - writer.close() - - if (!userBlockstore) { - await blockstore.close() + const out: AsyncIterable = { + [Symbol.asyncIterator] () { + if (writingPromise != null) { + throw new Error('Multiple iterator not supported') + } + // don't start writing until the user starts consuming the iterator + writingPromise = writeAll() + return { + async next () { + const result = await carOutIter.next() + if (result.done) { + await writingPromise // any errors will propagate from here + } + return result + } + } + } } return { root, out }