Skip to content

Commit

Permalink
feat: Add custom restorable cache
Browse files Browse the repository at this point in the history
  • Loading branch information
mkg20001 committed Aug 10, 2018
1 parent 297086f commit 768576c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
"homepage": "https://github.com/Teletunnel/teletunnel-core#readme",
"dependencies": {
"debug": "^3.1.0",
"forward-addr": "~0.1.1",
"forward-addr": "~0.1.2",
"interface-connection": "~0.3.2",
"pull-cache": "0.0.0",
"pull-reader": "^1.3.1",
"pull-stream": "^3.6.8"
},
"devDependencies": {
"aegir": "^15.1.0",
"teletunnel-protocols": "~0.1.0"
"teletunnel-protocols": "~0.1.2"
},
"contributors": [
"Maciej Krüger <mkg20001@gmail.com>"
Expand Down
3 changes: 2 additions & 1 deletion src/sorting-hat.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module.exports = async function sortingHat (conn, {timeout, protocols, handlers,
let found = false

while (true) {
log('try detect, state=%s', connState.length)
log('try detect, state=%s, protos=%s', connState.length, protocols.map(p => p.name).join(';'))

const wrapped = wrapper({conn, timeout: timeout || 2000})

Expand All @@ -35,6 +35,7 @@ module.exports = async function sortingHat (conn, {timeout, protocols, handlers,
if (!res.state) return [proto.name, res]
return [proto.name, res.props, res.state] // this is for more complex protos that need to pass a state to .stream() or subprotos
} catch (e) {
log(e)
return false
}
}))
Expand Down
54 changes: 52 additions & 2 deletions src/wrapper.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,60 @@
'use strict'

const Cache = require('pull-cache')
const Reader = require('pull-reader')
const pull = require('pull-stream')
const Connection = require('interface-connection').Connection

function Cache (read) {
let restored = false
const proms = []

const nextProm = () => { // generate a promise that resolves with the next read result
if (restored) return Promise.resolve([new Error('Cached stream has been restored!')])
const prom = new Promise((resolve, reject) => read(null, (...a) => resolve(a)))
proms.push(prom)
return prom
}

let fnc = () => {
let i = 0

return (end, cb) => { // await the next cached promise
if (end) { // cached stream shouldn't end source, do as if
return cb(true) // eslint-disable-line standard/no-callback-literal
}

let prom = proms[i++] || nextProm()
prom.then((a) => cb(...a)) // eslint-disable-line standard/no-callback-literal
}
}

fnc.restore = () => { // restore the stream
restored = true
let d = false
let i = 0

return (end, cb) => {
if (d) {
return read(end, cb)
} else {
if (end) { // if we have end, forward
d = true
return read(end, cb)
}

let prom = proms[i++]
if (!prom) { // if there is no promise anymore, continue as usual
d = true
return read(end, cb)
}
prom.then((a) => cb(...a)) // eslint-disable-line standard/no-callback-literal
}
}
}

return fnc
}

module.exports = ({timeout, conn}) => {
let cache = Cache(conn.source)

Expand All @@ -26,7 +76,7 @@ module.exports = ({timeout, conn}) => {
}
},
restore: () => {
let src = cache()
let src = cache.restore()
return new Connection({
source: (end, cb) => {
if (end) return cache(end, cb)
Expand Down

0 comments on commit 768576c

Please sign in to comment.