Skip to content

Commit

Permalink
Breaking: move to JSONStream. remove multibyte option (#6)
Browse files Browse the repository at this point in the history
* Move to JSONStream underneath the hood. this means support for json-stream (previous streaming impl) options are no longer supported. 
* supports directly calling `write` on parse streams
* now chunks up input strings when parsing using the function wrapper
* multibyte option removed as its now supported already by JSONStream
  • Loading branch information
DonutEspresso authored Nov 17, 2017
1 parent 58aaa92 commit 43344be
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 499 deletions.
8 changes: 1 addition & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CHANGELOG := $(TOOLS)/changelog.js
# Files and globs
#
PACKAGE_JSON := $(ROOT)/package.json
SHRINKWRAP := $(ROOT)/npm-shrinkwrap.json
PACKAGE_LOCK := $(ROOT)/package-lock.json
GITHOOKS := $(wildcard $(GITHOOKS_SRC)/*)
LCOV := $(COVERAGE)/lcov.info
ALL_FILES := $(shell find $(ROOT) \
Expand Down Expand Up @@ -99,13 +99,7 @@ codestyle-fix: $(NODE_MODULES) $(JSCS) $(ALL_FILES) ## Run code style checker wi

.PHONY: nsp
nsp: $(NODE_MODULES) $(NSP) $(NSP_BADGE) ## Run nsp. Shrinkwraps dependencies, checks for vulnerabilities.
ifeq ($(wildcard $(SHRINKWRAP)),)
@$(NPM) shrinkwrap --dev
@($(NSP) check) | $(NSP_BADGE)
@rm $(SHRINKWRAP)
else
@($(NSP) check) | $(NSP_BADGE)
endif


.PHONY: prepush
Expand Down
19 changes: 6 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ The major caveat is that the reconstructed POJO must be able to fit in memory.
If the reconstructed POJO cannot be stored in memory, then it may be time to
reconsider the way these large objects are being transported and processed.

This module currently uses [stream-json](https://github.com/uhop/stream-json/)
for parsing, and
This module currently uses
[JSONStream](https://github.com/dominictarr/JSONStream) for parsing, and
[json-stream-stringify](https://github.com/Faleij/json-stream-stringify) for
stringification.

Expand Down Expand Up @@ -77,17 +77,10 @@ stringifyStream.on('data', function(strChunk) {
});
```

IMPORTANT: Due to limitations in the implementation, directly calling
`write()` on the streams may cause unexpected behavior. For maximum
compatibility, use the Node.js streams `pipe()` method.


## API

### createParseStream(opts)

* `opts` {Object} an options object
* `opts.multibyte` {Boolean} handle multibyte chars, defaults to true
### createParseStream()

__Returns__: {Stream} a JSON.parse stream

Expand All @@ -102,8 +95,8 @@ __Returns__: {Stream} a JSON.stringify stream
An async JSON.parse using the same underlying stream implementation, but with
a callback interface.

* `opts` {Object} an options object passed to `createParseStream`
* `opts.body` {Object} the string to be parsed
* `opts` {Object} an options object
* `opts.body` {String} the string to be parsed
* `callback` {Function} a callback object

__Returns__: {Object} the parsed object
Expand All @@ -112,7 +105,7 @@ __Returns__: {Object} the parsed object
An async JSON.stringify using the same underlying stream implementation, but
with a callback interface.

* `opts` {Object} an options object passed to `createStringifyStream`
* `opts` {Object} an options object
* `opts.body` {Object} the object to be stringified
* `callback` {Function} a callback object

Expand Down
140 changes: 38 additions & 102 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,104 +4,48 @@
const stream = require('stream');

// external modules
const Assembler = require('stream-json/utils/Assembler');
const assert = require('assert-plus');
const JSONParseStream = require('stream-json/Combo');
const through2 = require('through2');
const intoStream = require('into-stream');
const JSONStream = require('JSONStream');
const miss = require('mississippi');
const once = require('once').strict;
const stringifyStreamFactory = require('json-stream-stringify');
const utf8Stream = require('utf8-stream');


/**
* Create a JSON.parse that uses a stream interface. The underlying stream-json
* module used to do the parsing is actually a combo stream that combines 3
* streams into a singular stream. It's a combo stream in the sense that the
* code is combined (generated?) into a single stream to avoid overhead of
* piping together 3 separate streams:
* * Parser (reading in raw strings)
* * Streamer (converts token into SAX-like event stream)
* * Packer (which assembles individual JS primitives from parsed chunks).
*
* Finally, an "Assembler" class is used to reconstruct the full POJO by
* assembling the individual JSON chunks. The caveat is that the process must
* be able to hold the fully reconstructed object in memory.
*
* However, since the assembler is not a stream and works outside the streams
* themselves, we must create a wrapper stream that that encapsulates the
* execution of the 3 internal streams plus the assembler. that way external
* consumers can expect to work with the standard stream events like 'data or
* 'end'. the wrapper stream can be thought of as an accumulator of the JSON
* chunks that will emit the fully constructed POJO once it is complete. as an
* accumulator, certain stream concepts like backpressure don't apply since it
* isn't emitting any data until the reconstruction of the POJO is complete.
* Create a JSON.parse that uses a stream interface. The underlying
* implementation is handled by JSONStream. This is merely a thin wrapper for
* convenience that handles the reconstruction/accumulation of each
* individually parsed field.
*
* The advantage of this approach is that by also using a streams interface,
* any JSON parsing or stringification of large objects won't block the CPU.
* @public
* @param {Object} [options] an options object
* @param {Object} [options.multibyte] when true, support multibyte. defaults
* to true.
* @function createParseStream
* @return {Stream}
*/
function createParseStream(options) {

const opts = Object.assign({
multibyte: true
}, options);

assert.optionalObject(opts, 'opts');
assert.optionalBool(opts.multibyte, 'opts.multibyte');

const assembler = new Assembler();
const parseStream = new JSONParseStream({
packKeys: true,
packStrings: true,
packNumbers: true
});
const wrapperStream = through2.obj(function(chunk, enc, callback) {
this.push(chunk);
return callback();
});
let redirected = false;

// when a read stream is piped in, redirect data from the wrapper stream to
// the real underlying json stream. redirection can only be done at the
// time of being hooked up, since we need the source stream.
wrapperStream.on('pipe', function wrapperStreamOnPipe(source) {
// as this is an accumulator stream, attempting to pipe in more than
// one source stream is a user error and should be fatal.
if (redirected === true) {
throw new Error(
'big-json parseStream cannot accept multiple sources!'
);
}

source.unpipe(this);

// pipe the stream, prepend with multibyte if necessary. utf8stream can
// never error, so no need to handle errors here.
if (opts.multibyte === true) {
this.transformStream = source.pipe(utf8Stream()).pipe(parseStream);
} else {
this.transformStream = source.pipe(parseStream);
function createParseStream() {

// when the parse stream gets chunks of data, it is an object with key/val
// fields. accumulate the parsed fields.
const accumulator = {};
const parseStream = JSONStream.parse('$*');
const wrapperStream = miss.through.obj(
function write(chunk, enc, cb) {
parseStream.write(chunk);
return cb();
},
function flush(cb) {
parseStream.on('end', function() {
return cb(null, accumulator);
});
parseStream.end();
}
redirected = true;
});
);

// when the parse stream gets chunks of data, through them into the
// assembler. the assembler is basically a pointer to the current
// object/scope from which a JSON chunk was parsed.
// for each chunk parsed, add it to the accumulator
parseStream.on('data', function(chunk) {
if (assembler[chunk.name]) {
assembler[chunk.name](chunk.value);
}
});

// on completion of parsing, write the completed pojo to the wrapper stream
// and end the stream.
parseStream.on('end', function() {
wrapperStream.end(assembler.current);
accumulator[chunk.key] = chunk.value;
});

// make sure error is forwarded on to wrapper stream.
Expand Down Expand Up @@ -143,22 +87,19 @@ function parse(opts, callback) {
assert.string(opts.body, 'opts.body');
assert.func(callback, 'callback');

const writeStream = through2.obj(function(chunk, enc, cb) {
this.push(chunk);
return cb();
});
const parseStream = createParseStream(opts);
const sourceStream = intoStream(opts.body);
const parseStream = createParseStream();
const cb = once(callback);

parseStream.on('data', function(pojo) {
return callback(null, pojo);
parseStream.on('data', function(data) {
return cb(null, data);
});

parseStream.on('error', function(err) {
return callback(err);
return cb(err);
});

writeStream.pipe(parseStream);
writeStream.end(opts.body);
sourceStream.pipe(parseStream);
}


Expand All @@ -168,36 +109,31 @@ function parse(opts, callback) {
* @public
* @param {Object} opts options to pass to stringify stream
* @param {Function} callback a callback function
* @function parse
* @function stringify
* @return {Object} the parsed JSON object
*/
function stringify(opts, callback) {
assert.object(opts, 'opts');
assert.func(callback, 'callback');

let done = false;
let stringified = '';
const stringifyStream = createStringifyStream(opts);
const passthroughStream = new stream.PassThrough();
const cb = once(callback);

// setup the passthrough stream as a sink
passthroughStream.on('data', function(chunk) {
stringified += chunk;
});

passthroughStream.on('end', function() {
// if we didn't already error and exit
if (done === false) {
return callback(null, stringified);
}
return null;
return cb(null, stringified);
});

// don't know what errors stringify stream may emit, but pass them back
// up.
stringifyStream.on('error', function(err) {
done = true;
return callback(err);
return cb(err);
});

stringifyStream.pipe(passthroughStream);
Expand Down
Loading

0 comments on commit 43344be

Please sign in to comment.