From 1eb1d12a22589627c056013744a2238648266617 Mon Sep 17 00:00:00 2001 From: Adrian Ottiker Date: Mon, 13 Feb 2017 21:12:53 +0100 Subject: [PATCH 1/4] add data stream to handler args --- index.js | 4 ++-- lib/json.js | 4 ++-- lib/mask.js | 2 +- lib/transform.js | 6 +++--- lib/translate.js | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/index.js b/index.js index cabb2be..59d1a7c 100644 --- a/index.js +++ b/index.js @@ -10,7 +10,7 @@ var slice = require('./lib/slice'); var emit = require('./lib/emit'); //var join = require('./lib/join'); -exports.deep = function (scope, inst, args, data, next) { +exports.deep = function (scope, inst, args, data, stream, next) { data = libob.deep(data); @@ -21,7 +21,7 @@ exports.deep = function (scope, inst, args, data, next) { next(null, data); }; -exports.flat = function (scope, inst, args, data, next) { +exports.flat = function (scope, inst, args, data, stream, next) { data = libob.flat(data); diff --git a/lib/json.js b/lib/json.js index 9ba57c8..6d9a103 100644 --- a/lib/json.js +++ b/lib/json.js @@ -9,7 +9,7 @@ var qs = require('qs'); * @param {object} The data object. * @param {function} The next handler. */ -exports.parse = function (scope, inst, options, data, next) { +exports.parse = function (scope, inst, options, data, stream, next) { var error; onKey(options, data, function (key, value, options) { if (!error) { @@ -58,7 +58,7 @@ exports.parse = function (scope, inst, options, data, next) { * @param {object} The data object. * @param {function} The next handler. */ -exports.stringify = function (scope, inst, options, data, next) { +exports.stringify = function (scope, inst, options, data, stream, next) { var error; onKey(options, data, function (key, value, options) { diff --git a/lib/mask.js b/lib/mask.js index 706ce13..94f3782 100644 --- a/lib/mask.js +++ b/lib/mask.js @@ -8,7 +8,7 @@ var libob = require('libobject'); * @param {object} The data object. * @param {function} The next handler. */ -module.exports = function (scope, inst, options, data, next) { +module.exports = function (scope, inst, options, data, stream, next) { /* TODO create docs Config example diff --git a/lib/transform.js b/lib/transform.js index a9f5fc7..e08910d 100644 --- a/lib/transform.js +++ b/lib/transform.js @@ -5,7 +5,7 @@ const libob = require('libobject'); /* Arguments: { "flat.key": "{field}" }*/ -exports.transform = function (scope, state, args, data, next) { +exports.transform = function (scope, state, args, data, stream, next) { libob.change(args, data, data); next(null, data); }; @@ -13,7 +13,7 @@ exports.transform = function (scope, state, args, data, next) { /* Arguments: ["dd|ss|ds|sd|ed|es", { "flat.key": "{field}" }]*/ -exports.transform2 = function (scope, state, args, data, next) { +exports.transform2 = function (scope, state, args, data, stream, next) { if (!(args instanceof Array)) { return next(new Error('Flow-tools.transform2: Invalid arguments.')) @@ -49,7 +49,7 @@ exports.transform2 = function (scope, state, args, data, next) { "flat.key": "{field}", "flat.key": ["{get.path.from.data}"i] ?? }*/ -exports.env_transform = function (scope, state, args, data, next) { +exports.env_transform = function (scope, state, args, data, stream, next) { let deep; for (let key in args) { diff --git a/lib/translate.js b/lib/translate.js index 426bf1c..824c53d 100644 --- a/lib/translate.js +++ b/lib/translate.js @@ -1,6 +1,6 @@ var libob = require('libobject'); -module.exports = function (scope, inst, options, data, next) { +module.exports = function (scope, inst, options, data, stream, next) { /* TODO create docs Config example From b9b7a8d2ae61fe4b71aff1b5bb88612d0c342039 Mon Sep 17 00:00:00 2001 From: Adrian Ottiker Date: Thu, 23 Feb 2017 21:34:25 +0100 Subject: [PATCH 2/4] :q --- lib/emit.js | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/lib/emit.js b/lib/emit.js index 007560e..4f8d3d6 100644 --- a/lib/emit.js +++ b/lib/emit.js @@ -1,11 +1,25 @@ "use strict" -module.exports = function (scope, state, args, data, next) { - let emit = data.emit || args.emit; - if (!emit) { - return next(new Error('Flow-tools.emit: Event not found.')); +// TODO create a suite to route flow sequences + +module.exports = function (scope, state, args = {}, data, stream, next) { + + /*{ + seqId: data.seqId (string) + package: data.data (object) + stream: data.stream (bool) + }*/ + + // call flow sequnce + let options; + if (typeof stream.pipe === "function") { + options = {objectMode: args.objectMode || stream._readableState.objectMode}; } - data.emit = scope.flow(emit).end(data); - next(null, data); + const flow = scope.flow(args.seq || data.seq, args.data || data, options, next); + flow.on('error', next); + stream.done = next; + if (flow.pipe) { + stream = stream.pipe(flow); + } }; From 25529317b7cc38540a7fd5133f332a8865853919 Mon Sep 17 00:00:00 2001 From: Adrian Ottiker Date: Mon, 13 Mar 2017 19:27:51 +0100 Subject: [PATCH 3/4] update to new flow version --- lib/emit.js | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/lib/emit.js b/lib/emit.js index 4f8d3d6..f424681 100644 --- a/lib/emit.js +++ b/lib/emit.js @@ -2,24 +2,14 @@ // TODO create a suite to route flow sequences -module.exports = function (scope, state, args = {}, data, stream, next) { - - /*{ - seqId: data.seqId (string) - package: data.data (object) - stream: data.stream (bool) - }*/ +module.exports = function (event, state, args, next) { // call flow sequnce let options; - if (typeof stream.pipe === "function") { - options = {objectMode: args.objectMode || stream._readableState.objectMode}; + if (typeof event.output.pipe === "function") { + options = {objectMode: args.objectMode};// || stream._i._readableState.objectMode}; } - const flow = scope.flow(args.seq || data.seq, args.data || data, options, next); - flow.on('error', next); - stream.done = next; - if (flow.pipe) { - stream = stream.pipe(flow); - } + data.seq = args.seq || data.seq; + next(null, data, event.output.pipe(scope.flow(data.seq, args.data || data, options))); }; From 4f1a8f25cc3ff2f2e27f36722fd16bb99567f674 Mon Sep 17 00:00:00 2001 From: Dan Andrei Date: Mon, 13 Mar 2017 20:51:38 +0100 Subject: [PATCH 4/4] Module is registry ready. --- index.js | 45 ++++++++---------------- lib/emit.js | 15 -------- lib/join.js | 4 +-- lib/json.js | 40 +++++++++++----------- lib/logic.js | 14 ++++---- lib/mask.js | 20 ++++++----- lib/slice.js | 12 +++---- lib/transform.js | 89 +++++++++++++++--------------------------------- lib/translate.js | 26 +++++++------- 9 files changed, 104 insertions(+), 161 deletions(-) delete mode 100644 lib/emit.js diff --git a/index.js b/index.js index 59d1a7c..1a2215f 100644 --- a/index.js +++ b/index.js @@ -1,35 +1,20 @@ -'use strict' - -var libob = require('libobject'); -var transform = require('./lib/transform'); -var translate = require('./lib/translate'); -var logic = require('./lib/logic'); -var mask = require('./lib/mask'); -var json = require('./lib/json'); -var slice = require('./lib/slice'); -var emit = require('./lib/emit'); -//var join = require('./lib/join'); - -exports.deep = function (scope, inst, args, data, stream, next) { - - data = libob.deep(data); - - if (data instanceof Error) { - return next(data); - } - - next(null, data); +'use strict'; + +const libob = require('libobject'); +const transform = require('./lib/transform'); +const translate = require('./lib/translate'); +const logic = require('./lib/logic'); +const mask = require('./lib/mask'); +const json = require('./lib/json'); +const slice = require('./lib/slice'); +//const join = require('./lib/join'); + +exports.deep = (data) => { + return libob.deep(data); }; -exports.flat = function (scope, inst, args, data, stream, next) { - - data = libob.flat(data); - - if (data instanceof Error) { - return next(data); - } - - next(null, data); +exports.flat = (data) => { + return libob.flat(data); }; exports.emit = emit; diff --git a/lib/emit.js b/lib/emit.js deleted file mode 100644 index f424681..0000000 --- a/lib/emit.js +++ /dev/null @@ -1,15 +0,0 @@ -"use strict" - -// TODO create a suite to route flow sequences - -module.exports = function (event, state, args, next) { - - // call flow sequnce - let options; - if (typeof event.output.pipe === "function") { - options = {objectMode: args.objectMode};// || stream._i._readableState.objectMode}; - } - - data.seq = args.seq || data.seq; - next(null, data, event.output.pipe(scope.flow(data.seq, args.data || data, options))); -}; diff --git a/lib/join.js b/lib/join.js index 2e845d9..5af852b 100644 --- a/lib/join.js +++ b/lib/join.js @@ -1,3 +1,3 @@ -module.exports = function (scope, inst, args, data, next) { +module.exports = () => { // TODO how to know, when to call next? -}; +}; \ No newline at end of file diff --git a/lib/json.js b/lib/json.js index 6d9a103..cdb576e 100644 --- a/lib/json.js +++ b/lib/json.js @@ -1,5 +1,7 @@ -var libob = require('libobject'); -var qs = require('qs'); +'use strict'; + +const libob = require('libobject'); +const qs = require('qs'); /** * Parse data or part of data @@ -7,11 +9,11 @@ var qs = require('qs'); * @public * @param {object} The options object. * @param {object} The data object. - * @param {function} The next handler. + * @param {function} The callback. */ -exports.parse = function (scope, inst, options, data, stream, next) { - var error; - onKey(options, data, function (key, value, options) { +exports.parse = (options, data, callback) => { + let error; + onKey(options, data, (key, value, options) => { if (!error) { try { @@ -47,8 +49,8 @@ exports.parse = function (scope, inst, options, data, stream, next) { } }); - next(error, data); -} + callback(error, data); +}; /** * Stringify data or part of data @@ -56,18 +58,18 @@ exports.parse = function (scope, inst, options, data, stream, next) { * @public * @param {object} The options object. * @param {object} The data object. - * @param {function} The next handler. + * @param {function} The callback. */ -exports.stringify = function (scope, inst, options, data, stream, next) { +exports.stringify = (options, data, callback) => { - var error; - onKey(options, data, function (key, value, options) { + let error; + onKey(options, data, (key, value, options) => { if (!error) { options = options || {}; try { value = JSON.stringify(value, null, options.space); if (key === null) { - data = value; + data = value; } else { // TODO this is a hack! @@ -84,9 +86,9 @@ exports.stringify = function (scope, inst, options, data, stream, next) { } } }); - - next(error, data); -} + + callback(error, data); +}; /** * stringify based on path @@ -101,8 +103,8 @@ function onKey (keys, object, handler) { return handler(null, object); } - keys.forEach(function (key) { - var options = null; + keys.forEach(key => { + let options = null; if (key instanceof Array) { options = key[1]; key = key[0]; @@ -110,4 +112,4 @@ function onKey (keys, object, handler) { handler(key, libob.path.get(key, object), options); }); -} +} \ No newline at end of file diff --git a/lib/logic.js b/lib/logic.js index d7d26a7..60e51d5 100644 --- a/lib/logic.js +++ b/lib/logic.js @@ -1,10 +1,12 @@ -var libob = require('libobject'); -var jsonLogic = require('json-logic-js'); +'use strict'; -module.exports = function (scope, inst, options, data, next) { +const libob = require('libobject'); +const jsonLogic = require('json-logic-js'); + +module.exports = (options, data, callback) => { if (!libob.isObject(options) || !libob.isObject(data)) { - return next(new Error('Flow-tools.logic: Options or data is not an object.')); + return callback(new Error('Flow-tools.logic: Options or data is not an object.')); } // do logic for multiple keys @@ -12,5 +14,5 @@ module.exports = function (scope, inst, options, data, next) { data[key] = jsonLogic.apply(options[key], data); }); - next(null, data); -}; + callback(null, data); +}; \ No newline at end of file diff --git a/lib/mask.js b/lib/mask.js index 94f3782..b997ac9 100644 --- a/lib/mask.js +++ b/lib/mask.js @@ -1,4 +1,6 @@ -var libob = require('libobject'); +'use strict'; + +const libob = require('libobject'); /** * Add a mask on top of the data chunk. @@ -6,9 +8,9 @@ var libob = require('libobject'); * @public * @param {object} The options object. * @param {object} The data object. - * @param {function} The next handler. + * @param {function} The callback function. */ -module.exports = function (scope, inst, options, data, stream, next) { +module.exports = (options, data, callback) => { /* TODO create docs Config example @@ -21,11 +23,11 @@ module.exports = function (scope, inst, options, data, stream, next) { */ if (!libob.isObject(options) || typeof data !== 'object') { - return next(new Error('Flow-Tools.mask: Config or data is not an object.')); + return callback(new Error('Flow-Tools.mask: Config or data is not an object.')); } - next(null, build(data, options)); -} + callback(null, build(data, options)); +}; /** * Build new object using mask @@ -35,9 +37,9 @@ module.exports = function (scope, inst, options, data, stream, next) { * @param {object} mask The mask used to build the new object. */ function build (obj, mask) { - var newObj = {}; + let newObj = {}; - for (var key in mask) { + for (let key in mask) { if (!mask.hasOwnProperty(key)) continue; if (libob.isObject(mask[key]) && obj[key]) { @@ -48,4 +50,4 @@ function build (obj, mask) { } return newObj; -} +} \ No newline at end of file diff --git a/lib/slice.js b/lib/slice.js index 2d2972e..5682083 100644 --- a/lib/slice.js +++ b/lib/slice.js @@ -1,17 +1,17 @@ -module.exports = function (scope, inst, options, data, next, stream) { +module.exports = (data, stream, callback) => { // data must be an array if (!(data instanceof Array)) { - return next(new Error('Flow-tool.slice: Data chunk must be an array.')); + return callback(new Error('Flow-tool.slice: Data chunk must be an array.')); } if (!data.length) { - return next(new Error('Flow-tool.slice: Data chunk must be a non empty array')); + return callback(new Error('Flow-tool.slice: Data chunk must be a non empty array')); } - var lastItem = data.pop(); - data.forEach(function (item) { + let lastItem = data.pop(); + data.forEach(item => { stream.push(item); }); - next(null, lastItem); + callback(null, lastItem); }; diff --git a/lib/transform.js b/lib/transform.js index e08910d..73ddb55 100644 --- a/lib/transform.js +++ b/lib/transform.js @@ -2,71 +2,36 @@ const libob = require('libobject'); -/* Arguments: { - "flat.key": "{field}" -}*/ -exports.transform = function (scope, state, args, data, stream, next) { - libob.change(args, data, data); - next(null, data); -}; - /* Arguments: ["dd|ss|ds|sd|ed|es", { "flat.key": "{field}" }]*/ -exports.transform2 = function (scope, state, args, data, stream, next) { - - if (!(args instanceof Array)) { - return next(new Error('Flow-tools.transform2: Invalid arguments.')) - } - - switch (args[0]) { - case "dd": - libob.change(args[1], data, data); - break; - case "ss": - libob.change(args[1], state, state); - break; - case "ds": - libob.change(args[1], data, state); - break; - case "sd": - libob.change(args[1], state, data); - break; - case "ed": - libob.change(args[1], scope.env, data); - break; - case "es": - libob.change(args[1], scope.env, state); - break; - default: - return next(new Error('Flow-tools.transform2: Invalid mode "' + args[0] + '"')); - } +exports.transform = (config, source, target) => { - next(null, data); + libob.change(config, source, target); }; -/* Arguments: { - "flat.key": "{field}", - "flat.key": ["{get.path.from.data}"i] ?? -}*/ -exports.env_transform = function (scope, state, args, data, stream, next) { - - let deep; - for (let key in args) { - if (key.indexOf('.') > 0) { - deep = true; - } - - if (typeof args[key] === 'string') { - data[key] = libob.path.get(args[key], scope.env); - } else { - data[key] = libob.path.get(libob.path.get(args[key][1], data), libob.path.get(args[key][0], scope.env), true); - } - } - - if (deep) { - data = libob.deep(data); - } - - next(null, data); -}; +// /* Arguments: { +// "flat.key": "{field}", +// "flat.key": ["{get.path.from.data}"i] ?? +// }*/ +// exports.env_transform = function (scope, state, args, data, stream, next) { + +// let deep; +// for (let key in args) { +// if (key.indexOf('.') > 0) { +// deep = true; +// } + +// if (typeof args[key] === 'string') { +// data[key] = libob.path.get(args[key], scope.env); +// } else { +// data[key] = libob.path.get(libob.path.get(args[key][1], data), libob.path.get(args[key][0], scope.env), true); +// } +// } + +// if (deep) { +// data = libob.deep(data); +// } + +// next(null, data); +// }; diff --git a/lib/translate.js b/lib/translate.js index 824c53d..08bdcd9 100644 --- a/lib/translate.js +++ b/lib/translate.js @@ -1,6 +1,8 @@ -var libob = require('libobject'); +'use strict'; -module.exports = function (scope, inst, options, data, stream, next) { +const libob = require('libobject'); + +module.exports = (config, data) => { /* TODO create docs Config example @@ -14,14 +16,14 @@ module.exports = function (scope, inst, options, data, stream, next) { } */ - if (typeof options !== 'object' || typeof data !== 'object') { - return next(new Error('Flow-Tools.translate: Config or data is not an object.')); + if (typeof config !== 'object' || typeof data !== 'object') { + return new Error('Flow-Tools.translate: Config or data is not an object.'); } - // replace paths from the options object - var value; - var translate = options; - for (var path in translate) { + // replace paths from the config object + let value; + let translate = config; + for (let path in translate) { if (typeof (value = libob.path.get(path, data)) === 'undefined') { continue; @@ -29,7 +31,7 @@ module.exports = function (scope, inst, options, data, stream, next) { if (typeof translate[path][value] !== 'undefined') { if (typeof translate[path][value] === 'object') { - for (var flat in translate[path][value]) { + for (let flat in translate[path][value]) { data[flat] = translate[path][value][flat]; } } else { @@ -38,6 +40,6 @@ module.exports = function (scope, inst, options, data, stream, next) { } } - // unflat data chunk - next(null, libob.deep(data)); -} + // unflat data chunk + return libob.deep(data); +};