From 79a8891929f5692a6eb4e969c923a12887b88d80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kat=20March=C3=A1n?= Date: Sat, 4 Mar 2017 15:36:54 -0800 Subject: [PATCH] fix(perf): use bulk file reads for index reads --- lib/entry-index.js | 106 +++++++++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/lib/entry-index.js b/lib/entry-index.js index 8ee2489a..8f14c318 100644 --- a/lib/entry-index.js +++ b/lib/entry-index.js @@ -6,14 +6,13 @@ const crypto = require('crypto') const fixOwner = require('./util/fix-owner') const fs = require('graceful-fs') const path = require('path') -const pipe = require('mississippi').pipe const Promise = require('bluebird') -const split = require('split') const through = require('mississippi').through const indexV = require('../package.json')['cache-version'].index const appendFileAsync = Promise.promisify(fs.appendFile) +const readFileAsync = Promise.promisify(fs.readFile) module.exports.insert = insert function insert (cache, key, digest, opts) { @@ -50,32 +49,20 @@ function insert (cache, key, digest, opts) { module.exports.find = find function find (cache, key) { const bucket = bucketPath(cache, key) - const stream = fs.createReadStream(bucket) - let ret - return Promise.fromNode(cb => { - pipe(stream, split('\n', null, {trailing: true}).on('data', function (l) { - const pieces = l.split('\t') - if (!pieces[1] || pieces[1].length !== parseInt(pieces[0], 10)) { - // Length is no good! Corruption ahoy! - return - } - let obj - try { - obj = JSON.parse(pieces[1]) - } catch (e) { - // Entry is corrupted! - return - } - if (obj && (obj.key === key)) { - ret = formatEntry(cache, obj) - } - }), function (err) { - if (err && err.code === 'ENOENT') { - cb(null, null) + return bucketEntries(cache, bucket).then(entries => { + return entries.reduce((latest, next) => { + if (next && next.key === key) { + return formatEntry(cache, next) } else { - cb(err, ret) + return latest } - }) + }, null) + }).catch(err => { + if (err.code === 'ENOENT') { + return null + } else { + throw err + } }) } @@ -102,38 +89,27 @@ function lsStream (cache) { return cb(err) } else { asyncMap(files, function (f, cb) { - fs.readFile(path.join(indexDir, bucket, f), 'utf8', function (err, data) { - if (err) { return cb(err) } - const entries = {} - data.split('\n').slice(1).forEach(function (entry) { - const pieces = entry.split('\t') - if (pieces[1].length !== parseInt(pieces[0], 10)) { - // Length is no good! Corruption ahoy! - return - } - let parsed - try { - parsed = JSON.parse(pieces[1]) - } catch (e) { - } - // NOTE - it's possible for an entry to be - // incomplete/corrupt. So we just skip it. - // See comment on `insert()` for deets. - if (parsed) { - entries[parsed.key] = formatEntry(cache, parsed) - } - }) + const bpath = path.join(indexDir, bucket, f) + bucketEntries(cache, bpath).then(_entries => { + const entries = _entries.reduce((acc, entry) => { + acc[entry.key] = entry + return acc + }, {}) Object.keys(entries).forEach(function (k) { - stream.write(entries[k]) + stream.write(formatEntry(cache, entries[k])) }) cb() + }, err => { + if (err.code === 'ENOENT') { + cb() + } else { + cb(err) + } }) - }, function (err) { - cb(err) - }) + }, cb) } }) - }, err => { + }, function (err) { if (err) { stream.emit('error') } stream.end() }) @@ -163,6 +139,32 @@ function notFoundError (cache, key) { return err } +function bucketEntries (cache, bucket, filter) { + return readFileAsync( + bucket, 'utf8' + ).then(data => { + let entries = [] + data.split('\n').forEach(entry => { + const pieces = entry.split('\t') + if (!pieces[1] || pieces[1].length !== parseInt(pieces[0], 10)) { + // Length is no good! Corruption ahoy! + return + } + let obj + try { + obj = JSON.parse(pieces[1]) + } catch (e) { + // Entry is corrupted! + return + } + if (obj) { + entries.push(obj) + } + }) + return entries + }) +} + function bucketDir (cache) { return path.join(cache, `index-v${indexV}`) }