Skip to content

Commit

Permalink
WS client: Add convenience method to connect to the event WebSocket (o…
Browse files Browse the repository at this point in the history
…penhab#2993)

Also improve the WS client code in general.

Signed-off-by: Florian Hotze <dev@florianhotze.com>
  • Loading branch information
florian-h05 authored Jan 8, 2025
1 parent ea5121d commit a9954c3
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 26 deletions.
119 changes: 96 additions & 23 deletions bundles/org.openhab.ui/web/src/js/openhab/ws.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,55 @@
import Framework7 from 'framework7'
import { getAccessToken } from './auth'

const HEARTBEAT_MESSAGE = `{
"type": "WebSocketEvent",
"topic": "openhab/websocket/heartbeat",
"payload": "PING",
"source": "WebSocketTestInstance"
}`
/**
* Build a heartbeat message for the given WebSocket client id.
* @param {string} id WS client id
* @return {string}
*/
function heartbeatMessage (id) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/heartbeat',
payload: 'PING',
source: id
})
}

function arrayToSerialisedString (arr) {
return '[' + arr.map(e => '"' + e + '"').join(',') + ']'
}

/**
* Build a event source filter message for the given WebSocket client id and the given sources.
* Source filters can be used to remove events from a specific source from the event WS.
* @param {string} id WS client id
* @param {string[]} sources event sources to filter out
* @return {string}
*/
function eventSourceFilterMessage (id, sources) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/source',
payload: arrayToSerialisedString(sources),
source: id
})
}

/**
* Build an event type filter message for the given WebSocket client id and the given event types.
* Event type filters can be used to select a sub-set of all available events for the event WS.
* @param {string} id WS client id
* @param types
* @return {string}
*/
function eventTypeFilterMessage (id, types) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/type',
payload: arrayToSerialisedString(types),
source: id
})
}

const openWSConnections = []

Expand All @@ -14,12 +58,12 @@ function newWSConnection (path, messageCallback, readyCallback, errorCallback, h
// Create a new WebSocket connection
const socket = new WebSocket(path, [`org.openhab.ws.accessToken.base64.${encodedToken}`, 'org.openhab.ws.protocol.default'])

socket.id = 'ui-' + Framework7.utils.id()

// Handle WebSocket connection opened
socket.onopen = (event) => {
socket.setKeepalive(heartbeatInterval)
if (readyCallback) {
readyCallback(event)
}
if (readyCallback) readyCallback(event)
}

// Handle WebSocket message received
Expand All @@ -42,15 +86,12 @@ function newWSConnection (path, messageCallback, readyCallback, errorCallback, h
}

// WebSocket keep alive
socket.setKeepalive = (seconds = 5) => {
socket.setKeepalive = (seconds) => {
if (!heartbeatCallback) return
console.debug('Setting keepalive interval seconds', seconds)
socket.clearKeepalive()
socket.keepaliveTimer = setInterval(() => {
if (heartbeatCallback) {
heartbeatCallback()
} else {
socket.send(HEARTBEAT_MESSAGE)
}
heartbeatCallback()
}, seconds * 1000)
}

Expand All @@ -69,21 +110,53 @@ function newWSConnection (path, messageCallback, readyCallback, errorCallback, h

export default {
/**
* Connect to the websocket at the given path.
* Connect to the WebSocket at the given path.
* This method provides raw access to WebSockets, the caller has to take care of the keepalive mechanism by specifying a heartbeat callback.
*
* @param {string} path path to connect to, e.g. `/ws`
* @param {fn} messageCallback
* @param {fn} [readyCallback=null]
* @param {fn} [errorCallback=null]
* @param {fn} [heartbeatCallback=null] heartbeat callback to use instead of the default PING/PONG
* @param {fn} messageCallback message callback to handle incoming messages
* @param {fn} heartbeatCallback heartbeat callback
* @param {fn} [readyCallback] ready callback
* @param {fn} [errorCallback] error callback
* @param {number} [heartbeatInterval=5] heartbeat interval in seconds
* @return {WebSocket}
*/
connect (path, messageCallback, readyCallback = null, errorCallback = null, heartbeatCallback = null, heartbeatInterval = 5) {
connect (path, messageCallback, heartbeatCallback, readyCallback, errorCallback, heartbeatInterval = 5) {
return newWSConnection(path, messageCallback, readyCallback, errorCallback, heartbeatCallback, heartbeatInterval)
},
/**
* Close the given websocket connection.
* Connect to the event WebSocket, which provides direct access to the EventBus.
* This convenience method takes care of the keepalive mechanism as well as filter setup.
*
* @param {string[]} types array of event types to filter by, if empty all events are received
* @param {fn} messageCallback message callback to handle incoming messages
* @param {fn} [readyCallback] ready callback
* @param {fn} [errorCallback] error callback
* @return {WebSocket}
*/
events (types, messageCallback, readyCallback, errorCallback) {
let socket

const extendedMessageCallback = (event) => {
if (event.type === 'WebSocketEvent') return
messageCallback(event)
}

const extendedReadyCallback = (event) => {
socket.send(eventSourceFilterMessage(socket.id, [socket.id]))
if (Array.isArray(types) && types.length > 0) socket.send(eventTypeFilterMessage(socket.id, types))
if (readyCallback) readyCallback(event)
}

const heartbeatCallback = () => {
socket.send(heartbeatMessage(socket.id))
}

socket = this.connect('/ws/events', extendedMessageCallback, heartbeatCallback, extendedReadyCallback, errorCallback)
return socket
},
/**
* Close the given WebSocket connection.
*
* @param {WebSocket} socket
* @param {fn} [callback=null] callback to execute on connection close
Expand All @@ -100,7 +173,7 @@ export default {
callback(event)
}
}
socket.close()
socket.clearKeepalive()
socket.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export default {
this.sseEvents = []
},
startWS () {
this.wsClient = this.$oh.ws.connect('/ws/events', (event) => {
this.wsClient = this.$oh.ws.events([], (event) => {
event.time = new Date()
this.wsEvents.unshift(...[event])
this.wsEvents.splice(5)
Expand Down
4 changes: 2 additions & 2 deletions bundles/org.openhab.ui/web/src/pages/developer/log-viewer.vue
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,11 @@ export default {
this.addLogEntry(event)
}
const keepaliveCallback = () => {
const heartbeatCallback = () => {
this.socket.send('[]')
}
this.socket = this.$oh.ws.connect('/ws/logs', messageCallback, readyCallback, null, keepaliveCallback, 9)
this.socket = this.$oh.ws.connect('/ws/logs', messageCallback, heartbeatCallback, readyCallback, null, 9)
// TEMP
// for (let i = 0; i < 1980; i++) {
Expand Down

0 comments on commit a9954c3

Please sign in to comment.