Skip to content
This repository has been archived by the owner on Dec 27, 2022. It is now read-only.

Add experimental.datPeers Lab API #1039

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/background-process/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ var userSetupStatusLookupPromise
var browserEvents = new EventEmitter()

process.on('unhandledRejection', (reason, p) => {
console.error('Unhandled Rejection at: Promise', p, 'reason:', reason)
debug('Unhandled Rejection at: Promise', p, 'reason:', reason)
})
process.on('uncaughtException', (err) => {
console.error('Uncaught exception:', err)
debug('Uncaught exception:', err)
})

Expand Down
179 changes: 179 additions & 0 deletions app/background-process/networks/dat/extensions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import EventEmitter from 'events'
import emitStream from 'emit-stream'
import {DatSessionDataExtMsg} from '@beaker/dat-session-data-ext-msg'
import {DatEphemeralExtMsg} from '@beaker/dat-ephemeral-ext-msg'

// globals
// =

var datSessionDataExtMsg = new DatSessionDataExtMsg()
var datEphemeralExtMsg = new DatEphemeralExtMsg()

// exported api
// =

export function setup () {
datEphemeralExtMsg.on('message', onEphemeralMsg)
datSessionDataExtMsg.on('session-data', onSessionDataMsg)
}

// call this on every archive created in the library
export function attach (archive) {
datEphemeralExtMsg.watchDat(archive)
datSessionDataExtMsg.watchDat(archive)
archive._datPeersEvents = new EventEmitter()
archive._datPeersOnPeerAdd = (peer) => onPeerAdd(archive, peer)
archive._datPeersOnPeerRemove = (peer) => onPeerRemove(archive, peer)
archive.metadata.on('peer-add', archive._datPeersOnPeerAdd)
archive.metadata.on('peer-remove', archive._datPeersOnPeerRemove)
}

// call this on every archive destroyed in the library
export function detach (archive) {
datEphemeralExtMsg.unwatchDat(archive)
datSessionDataExtMsg.unwatchDat(archive)
delete archive._datPeersEvents
archive.metadata.removeListener('peer-add', archive._datPeersOnPeerAdd)
archive.metadata.removeListener('peer-remove', archive._datPeersOnPeerRemove)
}

// impl for datPeers.list()
export function listPeers (archive) {
return archive.metadata.peers.map(internalPeerObj => createWebAPIPeerObj(archive, internalPeerObj))
}

// impl for datPeers.getPeer(peerId)
export function getPeer (archive, peerId) {
var internalPeerObj = archive.metadata.peers.find(internalPeerObj => getPeerId(internalPeerObj) === peerId)
return createWebAPIPeerObj(archive, internalPeerObj)
}

// impl for datPeers.broadcast(msg)
export function broadcastEphemeralMessage (archive, payload) {
datEphemeralExtMsg.broadcast(archive, encodeEphemeralMsg(payload))
}

// impl for datPeers.send(peerId, msg)
export function sendEphemeralMessage (archive, peerId, payload) {
datEphemeralExtMsg.send(archive, peerId, encodeEphemeralMsg(payload))
}

// impl for datPeers.getSessionData()
export function getSessionData (archive) {
return decodeSessionData(datSessionDataExtMsg.getLocalSessionData(archive))
}

// impl for datPeers.getSessionData(data)
export function setSessionData (archive, sessionData) {
datSessionDataExtMsg.setLocalSessionData(archive, encodeSessionData(sessionData))
}

export function createDatPeersStream (archive) {
return emitStream(archive._datPeersEvents)
}

// events
// =

function onPeerAdd (archive, internalPeerObj) {
if (getPeerId(internalPeerObj)) onHandshook()
else internalPeerObj.stream.stream.on('handshake', onHandshook)

function onHandshook () {
var peerId = getPeerId(internalPeerObj)

// send session data
if (datSessionDataExtMsg.getLocalSessionData(archive)) {
datSessionDataExtMsg.sendLocalSessionData(archive, peerId)
}

// emit event
archive._datPeersEvents.emit('connect', {
peerId,
sessionData: getPeerSessionData(archive, peerId)
})
}
}

function onPeerRemove (archive, internalPeerObj) {
var peerId = getPeerId(internalPeerObj)
if (peerId) {
archive._datPeersEvents.emit('disconnect', {
peerId,
sessionData: getPeerSessionData(archive, peerId)
})
}
}

function onEphemeralMsg (archive, internalPeerObj, msg) {
var peerId = getPeerId(internalPeerObj)
archive._datPeersEvents.emit('message', {
peerId,
sessionData: getPeerSessionData(archive, peerId),
message: decodeEphemeralMsg(msg)
})
}

function onSessionDataMsg (archive, internalPeerObj, sessionData) {
archive._datPeersEvents.emit('session-data', {
peerId: getPeerId(internalPeerObj),
sessionData: decodeSessionData(sessionData)
})
}

// internal methods
// =

function getPeerId (internalPeerObj) {
var feedStream = internalPeerObj.stream
var protocolStream = feedStream.stream
return protocolStream.remoteId ? protocolStream.remoteId.toString('hex') : null
}

function getPeerSessionData (archive, peerId) {
return decodeSessionData(datSessionDataExtMsg.getSessionData(archive, peerId))
}

function createWebAPIPeerObj (archive, internalPeerObj) {
var id = getPeerId(internalPeerObj)
var sessionData = getPeerSessionData(archive, id)
return {id, sessionData}
}

function encodeEphemeralMsg (payload) {
var contentType
if (Buffer.isBuffer(payload)) {
contentType = 'application/octet-stream'
} else {
contentType = 'application/json'
payload = Buffer.from(JSON.stringify(payload), 'utf8')
}
return {contentType, payload}
}

function decodeEphemeralMsg (msg) {
var payload
if (msg.contentType === 'application/json') {
try {
payload = JSON.parse(msg.payload.toString('utf8'))
} catch (e) {
console.error('Failed to parse ephemeral message', e, msg)
payload = null
}
}
return payload
}

function encodeSessionData (obj) {
return Buffer.from(JSON.stringify(obj), 'utf8')
}

function decodeSessionData (sessionData) {
if (!sessionData || sessionData.length === 0) return null
try {
return JSON.parse(sessionData.toString('utf8'))
} catch (e) {
console.error('Failed to parse local session data', e, sessionData)
return null
}
}
20 changes: 15 additions & 5 deletions app/background-process/networks/dat/library.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import * as archivesDb from '../../dbs/archives'
import * as datGC from './garbage-collector'
import * as folderSync from './folder-sync'
import {addArchiveSwarmLogging} from './logging-utils'
import * as datExtensions from './extensions'
import hypercoreProtocol from 'hypercore-protocol'
import hyperdrive from 'hyperdrive'


// network modules
import swarmDefaults from 'datland-swarm-defaults'
import discoverySwarm from 'discovery-swarm'
Expand Down Expand Up @@ -105,6 +107,9 @@ export function setup ({logfilePath}) {
})
})

// setup extensions
datExtensions.setup()

// setup the archive swarm
datGC.setup()
archiveSwarm = discoverySwarm(swarmDefaults({
Expand All @@ -130,6 +135,10 @@ export function createEventStream () {
return emitStream(archivesEvents)
}

export function createDebugStream () {
return emitStream(debugEvents)
}

export function getDebugLog (key) {
return new Promise((resolve, reject) => {
let rs = debugLogFile.createReadStream()
Expand All @@ -146,10 +155,6 @@ export function getDebugLog (key) {
})
}

export function createDebugStream () {
return emitStream(debugEvents)
}

// read metadata for the archive, and store it in the meta db
export async function pullLatestArchiveMeta (archive, {updateMTime} = {}) {
try {
Expand Down Expand Up @@ -348,6 +353,9 @@ async function loadArchiveInner (key, secretKey, userSettings = null) {
await updateSizeTracking(archive)
archivesDb.touch(key).catch(err => console.error('Failed to update lastAccessTime for archive', key, err))

// attach extensions
datExtensions.attach(archive)

// store in the discovery listing, so the swarmer can find it
// but not yet in the regular archives listing, because it's not fully loaded
archivesByDKey[datEncoding.toStr(archive.discoveryKey)] = archive
Expand Down Expand Up @@ -428,6 +436,7 @@ export async function unloadArchive (key) {
archive.fileActStream.end()
archive.fileActStream = null
}
datExtensions.detach(archive)
await new Promise((resolve, reject) => {
archive.close(err => {
if (err) reject(err)
Expand Down Expand Up @@ -661,7 +670,8 @@ function createReplicationStream (info) {
var stream = hypercoreProtocol({
id: networkId,
live: true,
encrypt: true
encrypt: true,
extensions: ['ephemeral', 'session-data']
})
stream.peerInfo = info

Expand Down
23 changes: 18 additions & 5 deletions app/background-process/test-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,15 @@ const METHODS = {
var page = pages.get(${page})
page.navbarEl.querySelector('.nav-location-input').value = "${url}"
page.navbarEl.querySelector('.nav-location-input').blur()
var loadPromise = new Promise(resolve => {
function onDidStopLoading () {
page.webviewEl.removeEventListener('did-stop-loading', onDidStopLoading)
resolve()
}
page.webviewEl.addEventListener('did-stop-loading', onDidStopLoading)
})
page.loadURL("${url}")
loadPromise
`)
},

Expand All @@ -64,11 +72,16 @@ const METHODS = {
},

async executeJavascriptOnPage (page, js) {
var res = await execute(`
var page = pages.get(${page})
page.webviewEl.getWebContents().executeJavaScript(\`` + js + `\`)
`)
return res
try {
var res = await execute(`
var page = pages.get(${page})
page.webviewEl.getWebContents().executeJavaScript(\`` + js + `\`)
`)
return res
} catch (e) {
console.error('Failed to execute javascript on page', js, e)
throw e
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions app/background-process/web-apis.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import datArchiveAPI from './web-apis/dat-archive'
// experimental manifests
import experimentalLibraryManifest from '../lib/api-manifests/external/experimental/library'
import experimentalGlobalFetchManifest from '../lib/api-manifests/external/experimental/global-fetch'
import experimentalDatPeersManifest from '../lib/api-manifests/external/experimental/dat-peers'

// experimental apis
import experimentalLibraryAPI from './web-apis/experimental/library'
import experimentalGlobalFetchAPI from './web-apis/experimental/global-fetch'
import experimentalDatPeersAPI from './web-apis/experimental/dat-peers'

// exported api
// =
Expand All @@ -58,4 +60,5 @@ export function setup () {
// experimental apis
rpc.exportAPI('experimental-library', experimentalLibraryManifest, experimentalLibraryAPI, secureOnly)
rpc.exportAPI('experimental-global-fetch', experimentalGlobalFetchManifest, experimentalGlobalFetchAPI, secureOnly)
rpc.exportAPI('experimental-dat-peers', experimentalDatPeersManifest, experimentalDatPeersAPI, secureOnly)
}
77 changes: 77 additions & 0 deletions app/background-process/web-apis/experimental/dat-peers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import parseDatURL from 'parse-dat-url'
import * as datLibrary from '../../networks/dat/library'
import datDns from '../../networks/dat/dns'
import * as datExtensions from '../../networks/dat/extensions'
import {checkLabsPerm} from '../../ui/permissions'
import {DAT_HASH_REGEX} from '../../../lib/const'
import {PermissionsError} from 'beaker-error-constants'

// constants
// =

const API_DOCS_URL = 'https://TODO' // TODO
const API_PERM_ID = 'experimentalDatPeers'
const LAB_API_ID = 'datPeers'
const LAB_PERMS_OBJ = {perm: API_PERM_ID, labApi: LAB_API_ID, apiDocsUrl: API_DOCS_URL}

// exported api
// =

export default {
async list () {
await checkLabsPerm(Object.assign({sender: this.sender}, LAB_PERMS_OBJ))
var archive = await getSenderArchive(this.sender)
return datExtensions.listPeers(archive)
},

async get (peerId) {
await checkLabsPerm(Object.assign({sender: this.sender}, LAB_PERMS_OBJ))
var archive = await getSenderArchive(this.sender)
return datExtensions.getPeer(archive, peerId)
},

async broadcast (data) {
await checkLabsPerm(Object.assign({sender: this.sender}, LAB_PERMS_OBJ))
var archive = await getSenderArchive(this.sender)
return datExtensions.broadcastEphemeralMessage(archive, data)
},

async send (peerId, data) {
await checkLabsPerm(Object.assign({sender: this.sender}, LAB_PERMS_OBJ))
var archive = await getSenderArchive(this.sender)
return datExtensions.sendEphemeralMessage(archive, peerId, data)
},

async getSessionData () {
await checkLabsPerm(Object.assign({sender: this.sender}, LAB_PERMS_OBJ))
var archive = await getSenderArchive(this.sender)
return datExtensions.getSessionData(archive)
},

async setSessionData (sessionData) {
await checkLabsPerm(Object.assign({sender: this.sender}, LAB_PERMS_OBJ))
var archive = await getSenderArchive(this.sender)
return datExtensions.setSessionData(archive, sessionData)
},

async createEventStream () {
await checkLabsPerm(Object.assign({sender: this.sender}, LAB_PERMS_OBJ))
var archive = await getSenderArchive(this.sender)
return datExtensions.createDatPeersStream(archive)
}
}

// internal methods
// =

async function getSenderArchive (sender) {
var url = sender.getURL()
if (!url.startsWith('dat:')) {
throw new PermissionsError('Only dat:// sites can use the datPeers API')
}
var urlp = parseDatURL(url)
if (!DAT_HASH_REGEX.test(urlp.host)) {
urlp.host = await datDns.resolveName(url)
}
return datLibrary.getArchive(urlp.host)
}
Loading