diff --git a/src/index.js b/src/index.js index ac90c95..327f739 100644 --- a/src/index.js +++ b/src/index.js @@ -1,50 +1,64 @@ import {subject, holdSubject} from 'most-subject' -const makeSinkProxies = drivers => - Object.keys(drivers) - .reduce((sinkProxies, driverName) => { - sinkProxies[driverName] = holdSubject() - return sinkProxies - }, {}) +function makeSinkProxies(drivers) { + const sinkProxies = {} + const keys = Object.keys(drivers) + for (let i = 0; i < keys.length; i++) { + sinkProxies[keys[i]] = holdSubject(1) + } + return sinkProxies +} -const callDrivers = (drivers, sinkProxies) => - Object.keys(drivers) - .reduce((sources, driverName) => { - sources[driverName] = - drivers[driverName](sinkProxies[driverName].stream, driverName) - return sources - }, {}) +function callDrivers(drivers, sinkProxies) { + const sources = {} + const keys = Object.keys(drivers) + for (let i = 0; i < keys.length; i++) { + let name = keys[i] + sources[name] = drivers[name](sinkProxies[name].stream, name) + } + return sources +} -const runMain = (main, sources, disposableStream) => { - const sinks = main(sources) - return Object.keys(sinks) - .reduce((accumulator, driverName) => { - accumulator[driverName] = sinks[driverName].until(disposableStream) - return accumulator - }, {}) +function makeHandleError(observer, onError) { + return function handleError(err) { + observer.error(err) + onError(err) + } } -const logErrorToConsole = err => { - if (console && console.error) { - console.error(err.message) +function replicateMany({sinks, sinkProxies, disposableStream, onError}) { + const sinkKeys = Object.keys(sinks) + for (let i = 0; i < sinkKeys.length; i++) { + let name = sinkKeys[i] + if (sinkProxies.hasOwnProperty(name)) { + let {observer} = sinkProxies[name] + sinks[name] + .until(disposableStream) + .observe(observer.next) + .then(observer.complete) + .catch(makeHandleError(observer, onError)) + } } } -const replicateMany = (sinks, sinkProxies) => - setTimeout(() => { - Object.keys(sinks) - .filter(driverName => sinkProxies[driverName]) - .forEach(driverName => { - sinks[driverName] - .forEach(sinkProxies[driverName].sink.add) - .then(sinkProxies[driverName].sink.end) - .catch(logErrorToConsole) - }) - }, 1) +function assertSinks(sinks) { + const keys = Object.keys(sinks) + for (let i = 0; i < keys.length; i++) { + if (!sinks[keys[i]] || typeof sinks[keys[i]].observe !== `function`) { + throw new Error(`Sink '${keys[i]}' must be a most.Stream`) + } + } + return sinks +} -const isObjectEmpty = object => Object.keys(object).length <= 0 +const logErrorToConsole = typeof console !== `undefined` && console.error ? + error => { console.error(error.stack || error) } : Function.prototype -const run = (main, drivers) => { +const defaults = { + onEror: logErrorToConsole, +} + +function runInputGuard({main, drivers, onError}) { if (typeof main !== `function`) { throw new Error(`First argument given to run() must be the ` + `'main' function.`) @@ -53,20 +67,27 @@ const run = (main, drivers) => { throw new Error(`Second argument given to run() must be an ` + `object with driver functions as properties.`) } - if (isObjectEmpty(drivers)) { + if (!Object.keys(drivers).length) { throw new Error(`Second argument given to run() must be an ` + `object with at least one driver function declared as a property.`) } - const {sink: disposableSink, stream: disposableStream} = subject() - const sinkProxies = makeSinkProxies(drivers, disposableStream) + + if (typeof onError !== `function`) { + throw new Error(`onError must be a function`) + } +} + +function run(main, drivers, {onError = logErrorToConsole} = defaults) { + runInputGuard({main, drivers, onError}) + const {observer: disposableObserver, stream: disposableStream} = subject() + const sinkProxies = makeSinkProxies(drivers) const sources = callDrivers(drivers, sinkProxies) - const sinks = runMain(main, sources, disposableStream) - replicateMany(sinks, sinkProxies) + const sinks = assertSinks(main(sources)) + replicateMany({sinks, sinkProxies, disposableStream, onError}) - const dispose = () => { - disposableSink.add(1) - Object.keys(sinkProxies).forEach(key => sinkProxies[key].sink.end()) - disposableSink.end() + function dispose() { + disposableObserver.next(1) + disposableObserver.complete() } return {sinks, sources, dispose}