Skip to content

Commit

Permalink
feat(): add optional onError function and assertSinks function
Browse files Browse the repository at this point in the history
Add the option to use your own onError function.
Assert that sinks are being returned as streams
Refactor to use function declarations instead of arrow function expressions
Use for-loops for performance gain.
  • Loading branch information
TylorS committed Mar 3, 2016
1 parent 90bd5f2 commit f942712
Showing 1 changed file with 66 additions and 45 deletions.
111 changes: 66 additions & 45 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,50 +1,64 @@
import {subject, holdSubject} from 'most-subject'

const makeSinkProxies = drivers =>
Object.keys(drivers)
.reduce((sinkProxies, driverName) => {
sinkProxies[driverName] = holdSubject()
return sinkProxies
}, {})
function makeSinkProxies(drivers) {
const sinkProxies = {}
const keys = Object.keys(drivers)
for (let i = 0; i < keys.length; i++) {
sinkProxies[keys[i]] = holdSubject(1)
}
return sinkProxies
}

const callDrivers = (drivers, sinkProxies) =>
Object.keys(drivers)
.reduce((sources, driverName) => {
sources[driverName] =
drivers[driverName](sinkProxies[driverName].stream, driverName)
return sources
}, {})
function callDrivers(drivers, sinkProxies) {
const sources = {}
const keys = Object.keys(drivers)
for (let i = 0; i < keys.length; i++) {
let name = keys[i]
sources[name] = drivers[name](sinkProxies[name].stream, name)
}
return sources
}

const runMain = (main, sources, disposableStream) => {
const sinks = main(sources)
return Object.keys(sinks)
.reduce((accumulator, driverName) => {
accumulator[driverName] = sinks[driverName].until(disposableStream)
return accumulator
}, {})
function makeHandleError(observer, onError) {
return function handleError(err) {
observer.error(err)
onError(err)
}
}

const logErrorToConsole = err => {
if (console && console.error) {
console.error(err.message)
function replicateMany({sinks, sinkProxies, disposableStream, onError}) {
const sinkKeys = Object.keys(sinks)
for (let i = 0; i < sinkKeys.length; i++) {
let name = sinkKeys[i]
if (sinkProxies.hasOwnProperty(name)) {
let {observer} = sinkProxies[name]
sinks[name]
.until(disposableStream)
.observe(observer.next)
.then(observer.complete)
.catch(makeHandleError(observer, onError))
}
}
}

const replicateMany = (sinks, sinkProxies) =>
setTimeout(() => {
Object.keys(sinks)
.filter(driverName => sinkProxies[driverName])
.forEach(driverName => {
sinks[driverName]
.forEach(sinkProxies[driverName].sink.add)
.then(sinkProxies[driverName].sink.end)
.catch(logErrorToConsole)
})
}, 1)
function assertSinks(sinks) {
const keys = Object.keys(sinks)
for (let i = 0; i < keys.length; i++) {
if (!sinks[keys[i]] || typeof sinks[keys[i]].observe !== `function`) {
throw new Error(`Sink '${keys[i]}' must be a most.Stream`)
}
}
return sinks
}

const isObjectEmpty = object => Object.keys(object).length <= 0
const logErrorToConsole = typeof console !== `undefined` && console.error ?
error => { console.error(error.stack || error) } : Function.prototype

const run = (main, drivers) => {
const defaults = {
onEror: logErrorToConsole,
}

function runInputGuard({main, drivers, onError}) {
if (typeof main !== `function`) {
throw new Error(`First argument given to run() must be the ` +
`'main' function.`)
Expand All @@ -53,20 +67,27 @@ const run = (main, drivers) => {
throw new Error(`Second argument given to run() must be an ` +
`object with driver functions as properties.`)
}
if (isObjectEmpty(drivers)) {
if (!Object.keys(drivers).length) {
throw new Error(`Second argument given to run() must be an ` +
`object with at least one driver function declared as a property.`)
}
const {sink: disposableSink, stream: disposableStream} = subject()
const sinkProxies = makeSinkProxies(drivers, disposableStream)

if (typeof onError !== `function`) {
throw new Error(`onError must be a function`)
}
}

function run(main, drivers, {onError = logErrorToConsole} = defaults) {
runInputGuard({main, drivers, onError})
const {observer: disposableObserver, stream: disposableStream} = subject()
const sinkProxies = makeSinkProxies(drivers)
const sources = callDrivers(drivers, sinkProxies)
const sinks = runMain(main, sources, disposableStream)
replicateMany(sinks, sinkProxies)
const sinks = assertSinks(main(sources))
replicateMany({sinks, sinkProxies, disposableStream, onError})

const dispose = () => {
disposableSink.add(1)
Object.keys(sinkProxies).forEach(key => sinkProxies[key].sink.end())
disposableSink.end()
function dispose() {
disposableObserver.next(1)
disposableObserver.complete()
}

return {sinks, sources, dispose}
Expand Down

0 comments on commit f942712

Please sign in to comment.