Skip to content

Commit

Permalink
feat: replace all streams with Minipass streams
Browse files Browse the repository at this point in the history
BREAKING CHANGE: This subtly changes the streaming interface of
everything in cacache that streams, which is, well, everything in
cacache.  Most users will probably not notice, but any code that
depended on stream behavior always being deferred until next tick will
need to adjust.

The mississippi methods 'to', 'from', 'through', and so on, have been
replaced with their Minipass counterparts, and streaming interaction
with the file system is done via fs-minipass.

The following modules are of interest here:

- [minipass](http://npm.im/minipass) The core stream library.

- [fs-minipass](http://npm.im/fs-minipass)  Note that the 'WriteStream'
  class from fs-minipass is _not_ a Minipass stream, but rather a plain
  old EventEmitter that duck types as a Writable.

- [minipass-collect](http://npm.im/minipass-collect) Gather up all the
  data from a stream.  Cacache only uses Collect.PassThrough, which is a
  basic Minipass passthrough stream which emits a 'collect' event with
  the completed data just before the 'end' event.

- [minipass-pipeline](http://npm.im/minipass-pipeline) Connect one or
  more streams into a pipe chain.  Errors anywhere in the pipeline are
  proxied down the chain and then up to the Pipeline object itself.
  Writes go into the head, reads go to the tail.  Used in place of
  pump() and pumpify().

- [minipass-flush](http://npm.im/minipass-flush) A Minipass passthrough
  stream that defers its 'end' event until after a flush() method has
  completed (either calling the supplied callback, or returning a
  promise.)  Use in place of flush-write-stream (aka mississippi.to).

Streams from through2, concat-stream, and the behavior provided by
end-of-stream are all implemented in Minipass itself.

Features of interest to cacache, which make Minipass a particularly good
fit:

- All of the 'endish' events are normalized, so we can just listen on
  'end' and know that finish, prefinish, and close will be handled as
  well.
- Minipass doesn't waste time [containing
  zalgo](https://blog.izs.me/2013/08/designing-apis-for-asynchrony).
- Minipass has built-in support for promises that indicate the end or
  error: stream.promise(), stream.collect(), and stream.concat().
- With reliable and consistent timing guarantees, much less
  error-checking logic is required.  We can be more confident that an
  error is being thrown or emitted in the correct place, rather than in
  a callback which is deferred, resulting in a hung promise or
  uncaughtException.

The biggest downside of Minipass is that it lacks some of the internal
characteristics of node-core streams, which many community modules use
to identify streams.  They have no _writableState or _readableState
objects, or _read or _write methods.  As a result, the is-stream module
(at least, at the time of this commit) doesn't recognize Minipass
streams as readable or writable streams.

All in all, the changes required of downstream users should be minimal,
but are unlikely to be zero.  Hence the semver major change.
  • Loading branch information
isaacs authored and ruyadorno committed Sep 24, 2019
1 parent a6545a9 commit f4c0962
Show file tree
Hide file tree
Showing 19 changed files with 474 additions and 776 deletions.
111 changes: 47 additions & 64 deletions get.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ const util = require('util')

const figgyPudding = require('figgy-pudding')
const fs = require('fs')
const { pipe, pipeline, through } = require('mississippi')
const index = require('./lib/entry-index')
const memo = require('./lib/memoization')
const read = require('./lib/content/read')

const Minipass = require('minipass')
const Collect = require('minipass-collect')
const Pipeline = require('minipass-pipeline')

const writeFile = util.promisify(fs.writeFile)

const GetOpts = figgyPudding({
Expand Down Expand Up @@ -120,43 +123,30 @@ function getDataSync (byDigest, cache, key, opts) {

module.exports.stream = getStream

const getMemoizedStream = (memoized) => {
const stream = new Minipass()
stream.on('newListener', function (ev, cb) {
ev === 'metadata' && cb(memoized.entry.metadata)
ev === 'integrity' && cb(memoized.entry.integrity)
ev === 'size' && cb(memoized.entry.size)
})
stream.end(memoized.data)
return stream
}

function getStream (cache, key, opts) {
opts = GetOpts(opts)
const stream = through()
const memoized = memo.get(cache, key, opts)
if (memoized && opts.memoize !== false) {
stream.on('newListener', function (ev, cb) {
ev === 'metadata' && cb(memoized.entry.metadata)
ev === 'integrity' && cb(memoized.entry.integrity)
ev === 'size' && cb(memoized.entry.size)
})
stream.write(memoized.data, () => stream.end())
return stream
return getMemoizedStream(memoized)
}

const stream = new Pipeline()
index
.find(cache, key)
.then((entry) => {
if (!entry) {
return stream.emit('error', new index.NotFoundError(cache, key))
}
let memoStream
if (opts.memoize) {
const memoData = []
let memoLength = 0
memoStream = through(
(c, en, cb) => {
memoData && memoData.push(c)
memoLength += c.length
cb(null, c, en)
},
(cb) => {
memoData &&
memo.put(cache, entry, Buffer.concat(memoData, memoLength), opts)
cb()
}
)
} else {
memoStream = through()
throw new index.NotFoundError(cache, key)
}
stream.emit('metadata', entry.metadata)
stream.emit('integrity', entry.integrity)
Expand All @@ -166,19 +156,24 @@ function getStream (cache, key, opts) {
ev === 'integrity' && cb(entry.integrity)
ev === 'size' && cb(entry.size)
})
pipe(
read.readStream(
cache,
entry.integrity,
opts.concat({
size: opts.size == null ? entry.size : opts.size
})
),
memoStream,
stream

const src = read.readStream(
cache,
entry.integrity,
opts.concat({
size: opts.size == null ? entry.size : opts.size
})
)

if (opts.memoize) {
const memoStream = new Collect.PassThrough()
memoStream.on('collect', data => memo.put(cache, entry, data, opts))
stream.unshift(memoStream)
}
stream.unshift(src)
})
.catch((err) => stream.emit('error', err))

return stream
}

Expand All @@ -188,34 +183,22 @@ function getStreamDigest (cache, integrity, opts) {
opts = GetOpts(opts)
const memoized = memo.get.byDigest(cache, integrity, opts)
if (memoized && opts.memoize !== false) {
const stream = through()
stream.write(memoized, () => stream.end())
const stream = new Minipass()
stream.end(memoized)
return stream
} else {
let stream = read.readStream(cache, integrity, opts)
if (opts.memoize) {
const memoData = []
let memoLength = 0
const memoStream = through(
(c, en, cb) => {
memoData && memoData.push(c)
memoLength += c.length
cb(null, c, en)
},
(cb) => {
memoData &&
memo.put.byDigest(
cache,
integrity,
Buffer.concat(memoData, memoLength),
opts
)
cb()
}
)
stream = pipeline(stream, memoStream)
const stream = read.readStream(cache, integrity, opts)
if (!opts.memoize) {
return stream
}
return stream
const memoStream = new Collect.PassThrough()
memoStream.on('collect', data => memo.put.byDigest(
cache,
integrity,
data,
opts
))
return new Pipeline(stream, memoStream)
}
}

Expand Down
30 changes: 12 additions & 18 deletions lib/content/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ const util = require('util')

const figgyPudding = require('figgy-pudding')
const fs = require('graceful-fs')
const pipe = util.promisify(require('mississippi').pipe)
const ssri = require('ssri')
const { PassThrough } = require('stream')
const contentPath = require('./path')
const Pipeline = require('minipass-pipeline')

const lstat = util.promisify(fs.lstat)
const readFile = util.promisify(fs.readFile)
Expand Down Expand Up @@ -58,23 +57,18 @@ module.exports.readStream = readStream

function readStream (cache, integrity, opts) {
opts = ReadOpts(opts)
const stream = new PassThrough()

const stream = new Pipeline()
withContentSri(cache, integrity, (cpath, sri) => {
return lstat(cpath).then((stat) => ({ cpath, sri, stat }))
})
.then(({ cpath, sri, stat }) => {
return pipe(
fs.createReadStream(cpath),
ssri.integrityStream({
integrity: sri,
size: opts.size
}),
stream
)
})
.catch((err) => {
stream.emit('error', err)
})
// just lstat to ensure it exists
return lstat(cpath).then(() => ({ cpath, sri }))
}).then(({ cpath, sri }) => {
stream.push(fs.createReadStream(cpath), ssri.integrityStream({
integrity: sri,
size: opts.size
}))
}, er => stream.emit('error', er))

return stream
}

Expand Down
Loading

0 comments on commit f4c0962

Please sign in to comment.