Skip to content

Commit

Permalink
ws server ping/pong and GC logic
Browse files Browse the repository at this point in the history
  • Loading branch information
robdiciuccio committed Mar 5, 2021
1 parent 08531f5 commit 1ecf689
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 33 deletions.
2 changes: 2 additions & 0 deletions superset-websocket/client-ws-app/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
npm start
open http://127.0.0.1:3000
4 changes: 2 additions & 2 deletions superset-websocket/client-ws-app/public/javascripts/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ function connect() {
if(socketCount >= tokens.length) return;

// using https://github.com/js-cookie/js-cookie
Cookies.set('jwt', tokens[socketCount], { expires: 1, path: '' });
Cookies.set('async-token', tokens[socketCount], { expires: 1, path: '' });

// Create WebSocket connection.
let url = `ws://localhost:8080?last_id=${ts()}`;
let url = `ws://127.0.0.1:8080?last_id=${ts()}`;
const socket = new WebSocket(url);

// Connection opened
Expand Down
109 changes: 79 additions & 30 deletions superset-websocket/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as http from 'http';
import * as net from 'net';
import WebSocket from 'ws';
import { v4 as uuidv4 } from 'uuid';

const config = require('./config.json');
const jwt = require('jsonwebtoken');
Expand All @@ -10,6 +11,7 @@ const Redis = require('ioredis');
type StreamResult = [recordId: string, record: [label: 'data', data: string]];
type ListenerFunction = (results: StreamResult[]) => void;
interface EventValue {
id: string,
channel_id: string,
job_id: string,
user_id?: string,
Expand All @@ -19,9 +21,10 @@ interface EventValue {
}
interface JwtPayload { channel: string };
interface FetchRangeFromStreamParams { sessionId: string, startId: string, endId: string, listener: ListenerFunction }
interface SocketInstance { ws: WebSocket, channel: string, pongTs: number };

interface ChannelValue {
sockets: Array<any>, // TODO
sockets: Array<string>,
}

const opts = {
Expand All @@ -37,6 +40,7 @@ const opts = {
jwtCookieName: "async-token",
redisStreamReadCount: 100,
redisStreamReadBlockMs: 5000,
socketResponseTimeoutMs: 30 * 1000,
debug: false,
}

Expand All @@ -47,32 +51,36 @@ if(opts.jwtSecret.length < 32)

const redis = new Redis(opts.redis);
const httpServer = http.createServer();
const wss = new WebSocket.Server({ noServer: true });
const wss = new WebSocket.Server({ noServer: true, clientTracking: false });

const socketActiveStates = [WebSocket.OPEN, WebSocket.CONNECTING];
const globalEventStreamName: string = `${opts.streamPrefix}full`;
let channels: Record<string, ChannelValue> = {};
let sockets: Record<string, SocketInstance> = {};
let lastFirehoseId: string = '$';
let redisRequestCount: number = 0;

function sendToChannel(channel: string, value: EventValue): void {
console.log('*** sendToChannel', channel, value);
const strData = JSON.stringify(value);
if(!channels[channel]) {
console.log(`channel ${channel} is unknown, skipping`);
return;
}
channels[channel].sockets.forEach(ws => {
ws.send(strData);
channels[channel].sockets.forEach(socketId => {
const socketInstance: SocketInstance = sockets[socketId];
try {
socketInstance.ws.send(strData);
} catch(err) {
console.log('Error sending to socket', err);
cleanChannel(channel);
}
});
console.log(`${strData} sent to ${channel}`);
}

async function fetchRangeFromStream({sessionId, startId, endId, listener}: FetchRangeFromStreamParams) {
console.log('fetchRangeFromStream', sessionId, startId, endId);
const streamName = `${opts.streamPrefix}${sessionId}`;
try {
const reply = await redis.xrange(streamName, startId, endId);
console.log('*** fetchRangeFromStream reply', reply);
if (!reply || !reply.length) return;
listener(reply);
} catch(e) {
Expand All @@ -81,11 +89,8 @@ async function fetchRangeFromStream({sessionId, startId, endId, listener}: Fetch
}

async function subscribeToGlobalStream(stream: string, listener: ListenerFunction) {
console.log(`subscribeToGlobalStream`, stream, listener);

while (true) {
try {
console.log('*** lastFirehoseId', lastFirehoseId);
const reply = await redis.xread(
'BLOCK',
opts.redisStreamReadBlockMs,
Expand All @@ -95,7 +100,6 @@ async function subscribeToGlobalStream(stream: string, listener: ListenerFunctio
stream,
lastFirehoseId
);
console.log('*** firehose reply', reply);
if (!reply) {
continue
}
Expand All @@ -114,12 +118,12 @@ async function subscribeToGlobalStream(stream: string, listener: ListenerFunctio
}

function processStreamResults(results: StreamResult[]): void {
console.log('process results', results);
results.forEach((item) => {
try {
const id = item[0];
const data = JSON.parse(item[1][1]);
console.log('data', data);
sendToChannel(data.channel_id, data);
sendToChannel(data.channel_id, {id, ...data});
} catch(err) {
console.log(err);
}
Expand All @@ -129,7 +133,6 @@ function processStreamResults(results: StreamResult[]): void {
function getJwtPayload(request: http.IncomingMessage): JwtPayload {
const cookies = cookie.parse(request.headers.cookie);
const token = cookies[opts.jwtCookieName];
console.log('cookies', cookies, token);

if(!token) throw new Error('JWT not present');
return jwt.verify(token, opts.jwtSecret);
Expand All @@ -138,23 +141,24 @@ function getJwtPayload(request: http.IncomingMessage): JwtPayload {
function getLastId(request: http.IncomingMessage): string | null {
const url = new URL(String(request.url), 'http://0.0.0.0');
const queryParams = url.searchParams;
console.log('queryParams', queryParams);
return queryParams.get('last_id');
}

wss.on('connection', function connection(ws: WebSocket, request: http.IncomingMessage) {
const jwtPayload: JwtPayload = getJwtPayload(request);
const channel: string = jwtPayload.channel;
const instance: SocketInstance = { ws, channel, pongTs: Date.now() }

const socketId = uuidv4();
sockets[socketId] = instance;

if(channel in channels) {
channels[channel].sockets.push(ws)
channels[channel].sockets.push(socketId)
} else {
channels[channel] = {sockets: [ws]};
channels[channel] = {sockets: [socketId]};
}

const lastId = getLastId(request);
console.log('lastId', lastId);

if(lastId) {
const endId = (lastFirehoseId === '$' ? '+' : lastFirehoseId);
fetchRangeFromStream({
Expand All @@ -165,10 +169,16 @@ wss.on('connection', function connection(ws: WebSocket, request: http.IncomingMe
});
}

// ws.on('message', function message(msg: string) {
// console.log(`Received message ${msg} on channel ${ws.channel}`);
// sendToChannel(ws.channel, { msg: `received message on channel ${ws.channel}`});
// });
ws.on('pong', function pong(data: Buffer) {
const socketId = data.toString();
console.log('pong', socketId);
const socketInstance = sockets[socketId];
if (!socketInstance) {
console.warn(`pong received for nonexistent socket ${socketId}`);
} else {
socketInstance.pongTs = Date.now();
}
});
});

httpServer.on('upgrade', function upgrade(request: http.IncomingMessage, socket: net.Socket, head: Buffer) {
Expand All @@ -190,11 +200,50 @@ httpServer.on('upgrade', function upgrade(request: http.IncomingMessage, socket:
httpServer.listen(opts.port);
console.log(`Server started on port ${opts.port}`);

if(opts.debug) {
setInterval(() => {
console.log('total connected sockets', wss.clients.size);
console.log('redis request count', redisRequestCount);
}, 2000)
subscribeToGlobalStream(globalEventStreamName, processStreamResults);

// Connection cleanup and garbage collection

const checkSockets = () => {
console.log('*** checkSockets', Object.keys(sockets).length);
for (const socketId in sockets) {
const socketInstance = sockets[socketId];
const timeout = Date.now() - socketInstance.pongTs;
if(timeout > opts.socketResponseTimeoutMs) {
console.log(`terminating socket ${socketId} for channel ${socketInstance.channel}`);
socketInstance.ws.terminate();
delete sockets[socketId];
} else {
socketInstance.ws.ping(socketId);
}
}
}

subscribeToGlobalStream(globalEventStreamName, processStreamResults);
const cleanChannel = (channel: string) => {
const activeSockets: string[] = channels[channel].sockets.filter(socketId => {
const socketInstance = sockets[socketId];
if (!socketInstance) return false;
if (socketActiveStates.includes(socketInstance.ws.readyState)) return true;
return false;
});

if(activeSockets.length === 0) {
delete channels[channel];
} else {
channels[channel].sockets = activeSockets;
}
}

const checkSocketsInterval = setInterval(checkSockets, 5000);
const cleanChannelInterval = setInterval(function gc() {
for (const channel in channels) {
cleanChannel(channel);
}
}, 60000);

// if(opts.debug) {
// setInterval(() => {
// console.log('total connected sockets', wss.clients.size);
// console.log('redis request count', redisRequestCount);
// }, 2000)
// }
11 changes: 11 additions & 0 deletions superset-websocket/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion superset-websocket/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
"scripts": {
"start": "node index.js",
"test": "echo \"Error: no test specified\" && exit 1",
"start-dev": "ts-node index.ts",
"dev-server": "ts-node index.ts",
"build": "tsc"
},
"license": "Apache-2.0",
"dependencies": {
"cookie": "^0.4.1",
"ioredis": "^4.16.1",
"jsonwebtoken": "^8.5.1",
"uuid": "^8.3.2",
"ws": "^7.4.2"
},
"devDependencies": {
"@types/node": "^14.14.22",
"@types/uuid": "^8.3.0",
"@types/ws": "^7.4.0",
"ts-node": "^9.1.1",
"typescript": "^4.1.3"
Expand Down

0 comments on commit 1ecf689

Please sign in to comment.