From c77d794a124dda98a1ebdcba44febaf5f8806da5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kat=20March=C3=A1n?= Date: Wed, 1 Mar 2017 18:48:22 -0800 Subject: [PATCH] feat(get): added memoization support for get --- get.js | 136 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 118 insertions(+), 18 deletions(-) diff --git a/get.js b/get.js index 13f0576..2e6b03b 100644 --- a/get.js +++ b/get.js @@ -4,7 +4,9 @@ const Promise = require('bluebird') const index = require('./lib/entry-index') const finished = Promise.promisify(require('mississippi').finished) +const memo = require('./lib/memoization') const pipe = require('mississippi').pipe +const pipeline = require('mississippi').pipeline const read = require('./lib/content/read') const through = require('mississippi').through @@ -16,39 +18,137 @@ module.exports.byDigest = function getByDigest (cache, digest, opts) { } function getData (byDigest, cache, key, opts) { opts = opts || {} - const src = (byDigest ? getStream.byDigest : getStream)(cache, key, opts) - let data = '' - let meta - src.on('data', function (d) { data += d }) - src.on('metadata', function (m) { meta = m }) - return finished(src).then(() => ({ data, meta })) + opts.hashAlgorithm = opts.hashAlgorithm || 'sha1' + const memoized = ( + byDigest + ? memo.get.byDigest(cache, key, opts.hashAlgorithm) + : memo.get(cache, key) + ) + if (memoized && opts.memoize !== false) { + return Promise.resolve({ + metadata: memoized.entry.metadata, + data: memoized.data, + digest: memoized.entry.digest, + hashAlgorithm: memoized.entry.hashAlgorithm + }) + } + const src = (byDigest ? getStreamDigest : getStream)(cache, key, opts) + let acc = [] + let dataTotal = 0 + let metadata + let digest + let hashAlgorithm + if (!byDigest) { + src.on('digest', d => { + digest = d + }) + src.on('hashAlgorithm', d => { hashAlgorithm = d }) + src.on('metadata', d => { metadata = d }) + } + src.on('data', d => { + acc.push(d) + dataTotal += d.length + }) + return finished(src).then(() => { + const data = Buffer.concat(acc, dataTotal) + return byDigest ? data : { metadata, data, digest, hashAlgorithm } + }) } module.exports.stream = getStream -module.exports.stream.byDigest = read.readStream function getStream (cache, key, opts) { - const stream = through() - index.find(cache, key).catch(err => { - stream.emit('error', err) - }).then(data => { - if (!data) { + opts = opts || {} + let stream = through() + const memoized = memo.get(cache, key) + if (memoized && opts.memoize !== false) { + stream.on('newListener', function (ev, cb) { + ev === 'metadata' && cb(memoized.entry.metadata) + ev === 'digest' && cb(memoized.entry.digest) + ev === 'hashAlgorithm' && cb(memoized.entry.hashAlgorithm) + }) + stream.write(memoized.data, () => stream.end()) + return stream + } + index.find(cache, key).then(entry => { + if (!entry) { return stream.emit( 'error', index.notFoundError(cache, key) ) } - stream.emit('metadata', data) + let memoStream + if (opts.memoize) { + let memoData = [] + let memoLength = 0 + memoStream = through((c, en, cb) => { + memoData && memoData.push(c) + memoLength += c.length + cb(null, c, en) + }, cb => { + memoData && memo.put(cache, entry, Buffer.concat(memoData, memoLength)) + cb() + }) + } else { + memoStream = through() + } + // TODO - don't overwrite someone else's `opts`. + opts.hashAlgorithm = entry.hashAlgorithm + stream.emit('metadata', entry.metadata) + stream.emit('hashAlgorithm', entry.hashAlgorithm) + stream.emit('digest', entry.digest) stream.on('newListener', function (ev, cb) { - ev === 'metadata' && cb(data) + ev === 'metadata' && cb(entry.metadata) + ev === 'digest' && cb(entry.digest) + ev === 'hashAlgorithm' && cb(entry.hashAlgorithm) }) pipe( - read.readStream(cache, data.digest, opts), + read.readStream(cache, entry.digest, opts), + memoStream, stream ) - }) + }, err => stream.emit('error', err)) return stream } +module.exports.stream.byDigest = getStreamDigest +function getStreamDigest (cache, digest, opts) { + opts = opts || {} + opts.hashAlgorithm = opts.hashAlgorithm || 'sha1' + const memoized = memo.get.byDigest(cache, digest, opts.hashAlgorithm) + if (memoized && opts.memoize !== false) { + const stream = through() + stream.write(memoized, () => stream.end()) + return stream + } else { + let stream = read.readStream(cache, digest, opts) + if (opts.memoize) { + let memoData = [] + let memoLength = 0 + const memoStream = through((c, en, cb) => { + memoData && memoData.push(c) + memoLength += c.length + cb(null, c, en) + }, cb => { + memoData && memo.put.byDigest( + cache, + digest, + opts.hashAlgorithm, + Buffer.concat(memoData, memoLength) + ) + cb() + }) + stream = pipeline(stream, memoStream) + } + return stream + } +} + module.exports.info = info -function info (cache, key) { - return index.find(cache, key) +function info (cache, key, opts) { + opts = opts || {} + const memoized = memo.get(cache, key) + if (memoized && opts.memoize !== false) { + return Promise.resolve(memoized.entry) + } else { + return index.find(cache, key) + } }