Skip to content

Commit

Permalink
fix TunnelServer event listener leak (#103)
Browse files Browse the repository at this point in the history
Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
  • Loading branch information
jakolehm authored Aug 16, 2021
1 parent a2d2172 commit c9d726d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 71 deletions.
81 changes: 40 additions & 41 deletions src/__tests__/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ V6L11BWkpzGXSW4Hv43qa+GSYOD2QU68Mb59oSk2OB+BtOLpJofmbGEGgvmwyCI9
MwIDAQAB
-----END PUBLIC KEY-----`;

type IncomingSocket = {
connection: "open" | "close";
ws: WebSocket;
};

describe("TunnelServer", () => {
let server: TunnelServer;
const port = 51515;
Expand All @@ -33,7 +38,7 @@ describe("TunnelServer", () => {
* }
*/
const jwtToken = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJsZW5zLXVzZXIiLCJncm91cHMiOlsiZGV2Il0sImlhdCI6MTUxNjIzOTAyMiwiY2x1c3RlcklkIjoiYTAyNmU1MGQtZjliNC00YWE4LWJhMDItYzk3MjJmN2YwNjYzIiwiYXVkIjoiaHR0cDovL2xvY2FsaG9zdC9ib3JlZC9hMDI2ZTUwZC1mOWI0LTRhYTgtYmEwMi1jOTcyMmY3ZjA2NjMifQ.jkTbX_O8UWbYdCRiTv4NEgDkewEOB9QrLOHOm_Ox8BKt7DC4696bbdOwVn_VHist0g6889ms0m8Nr_RKW5BW90ItAsfDx_0cp34_WKPuMBeXYxkfAEabBbhjATfrW1IUTVtV9R_qQ71nbqlhY9UudByfETI8CanjbDP7QYZCxmVCf2HvRML3h6mS1tqHmqZvjRAHY-cFmO8qa6xLp2c1vFMxuCoSZGoGIqoNPaLKIVBbDdjxzOEjO__gQX6ksUZxsHOy13iBre8gbBVi85lhkSCZa9OtXDEAICqsrlpHZvxIYqYMgBNG0YY4sVvvDGJgDxxTyWn8lphKrZyWWtNvjw";

/**
* {
* "sub": "a026e50d-f9b4-4aa8-ba02-c9722f7f0663",
Expand All @@ -55,10 +60,7 @@ describe("TunnelServer", () => {
const sleep = (amount: number) => new Promise((resolve) => setTimeout(resolve, amount));
const get = async (path: string, headers?: Headers) => got(`http://localhost:${port}${path}`, { throwHttpErrors: false, headers });

const incomingSocket = (type = "agent", headers: { [key: string]: string } = {}, keepOpen = 10, close = true, endpoint = "connect"): Promise<{
connection: "open" | "close";
ws: WebSocket;
}> => {
const incomingSocket = (type = "agent", headers: { [key: string]: string } = {}, keepOpen = 10, close = true, endpoint = "connect"): Promise<IncomingSocket> => {
return new Promise((resolve, reject) => {
const ws = new WebSocket(`http://localhost:${port}/${type}/${endpoint}`, {
headers
Expand Down Expand Up @@ -122,7 +124,7 @@ describe("TunnelServer", () => {

const agents = server.getAgentsForClusterId("a026e50d-f9b4-4aa8-ba02-c9722f7f0663");

agents.push(new Agent(ws as any, "rsa-public-key", server));
agents.push(new Agent(ws as any, "rsa-public-key", server, "test-id"));

const res = await get("/client/public-key", { "Authorization": `Bearer ${jwtToken}`});

Expand Down Expand Up @@ -451,7 +453,7 @@ describe("TunnelServer", () => {
await expect(connect()).resolves.toHaveProperty("connection", "open");
});

it("sends empty presence json to client presence socket when socket is open", async () => {
it("sends empty presence json to client presence socket when socket is open", async (done) => {
expect.assertions(1);

const presence = await incomingSocket("client", {
Expand All @@ -463,17 +465,14 @@ describe("TunnelServer", () => {
"presence" : {
"userIds" : []
}
})
);
}));
presence.ws.close();
done();
};

await sleep(200); //waits until first message was sent

presence.ws.close();
});


it("sends presence json to client presence socket when socket is open and clients are already connected", async () => {
it("sends presence json to client presence socket when socket is open and clients are already connected", async (done) => {
expect.assertions(1);

const agent = await incomingSocket("agent", {
Expand All @@ -489,22 +488,22 @@ describe("TunnelServer", () => {
}, undefined, false, "presence");

presence.ws.onmessage = (message) => {
expect(message.data).toBe(JSON.stringify({
expect(message.data).toBe(JSON.stringify({
"presence" : {
"userIds" : ["lens-user"]
}
})
);
};

await sleep(200); //waits until first message was sent
presence.ws.close();
client.ws.close();
agent.ws.close();

presence.ws.close();
client.ws.close();
agent.ws.close();
done();
};
});

it("sends userIds per agent to client presence socket after agent and client connected", async () => {
it("sends userIds per agent to client presence socket after agent and client connected", async (done) => {
expect.assertions(1);

const presence = await incomingSocket("client", {
Expand All @@ -513,31 +512,34 @@ describe("TunnelServer", () => {

await sleep(200); //waits until first message was sent

let agent: IncomingSocket | null = null;
let client: IncomingSocket | null = null;

presence.ws.onmessage = (message) => {
expect(message.data).toBe(JSON.stringify({
console.log("message", message.data);
expect(message.data).toBe(JSON.stringify({
"presence" : {
"userIds" : ["lens-user"]
}
})
);
}));

presence.ws.close();
client?.ws.close();
agent?.ws.close();

done();
};

const agent = await incomingSocket("agent", {
agent = await incomingSocket("agent", {
"Authorization": `Bearer ${agentJwtToken}`
}, undefined, false);

const client = await incomingSocket("client", {
client = await incomingSocket("client", {
"Authorization": `Bearer ${jwtToken}`
}, undefined, false);

await sleep(100); //waits until on "ClientConnected" message was sent

presence.ws.close();
client.ws.close();
agent.ws.close();
});

it("sends empty presence json to client presence socket after agent and client connected and disconnected", async () => {
it("sends empty presence json to client presence socket after agent and client connected and disconnected", async (done) => {
expect.assertions(1);

const presence = await incomingSocket("client", {
Expand All @@ -555,21 +557,18 @@ describe("TunnelServer", () => {
}, undefined, false);

presence.ws.onmessage = (message) => {
console.log(message.data);
expect(message.data).toBe(JSON.stringify({
expect(message.data).toBe(JSON.stringify({
"presence" : {
"userIds" : []
}
})
);
}));

presence.ws.close();
done();
};

agent.ws.close();
client.ws.close();

await sleep(200); //waits until on "ClientDisconnected" message was received

presence.ws.close();
});
});
});
Expand Down
8 changes: 5 additions & 3 deletions src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ export interface Client {
export class Agent {
public socket: WebSocket;
public publicKey: string;
public clusterId: string;
public clients: Client[] = [];
private mplex: BoredMplexClient;
private server: TunnelServer;

constructor(socket: WebSocket, publicKey: string, server: TunnelServer) {
constructor(socket: WebSocket, publicKey: string, server: TunnelServer, clusterId: string) {
this.socket = socket;
this.publicKey = publicKey;
this.server = server;
this.clusterId = clusterId;

const stream = WebSocket.createWebSocketStream(this.socket);

Expand Down Expand Up @@ -77,7 +79,7 @@ export class Agent {
this.removeClient(socket);
});

this.server.emit("ClientConnected", {});
this.server.emit("ClientConnected", this.clusterId);
}

removeClient(socket: WebSocket) {
Expand All @@ -91,7 +93,7 @@ export class Agent {

client.socket.close(4410);

this.server.emit("ClientDisconnected", {});
this.server.emit("ClientDisconnected", this.clusterId);
console.log("SERVER: client disconnected");
}

Expand Down
2 changes: 1 addition & 1 deletion src/request-handlers/agent-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export function handleAgentSocket(req: IncomingMessage, socket: WebSocket, serve

console.log(`SERVER: agent connected. Cluster id: ${clusterId}`);
const publicKey = Buffer.from(req.headers["x-bored-publickey"]?.toString() || "", "base64").toString("utf-8");
const agent = new Agent(socket, publicKey, server);
const agent = new Agent(socket, publicKey, server, clusterId);

agents.push(agent);

Expand Down
33 changes: 12 additions & 21 deletions src/request-handlers/client-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function handleClientSocket(req: IncomingMessage, socket: WebSocket, serv

try {
const tokenData = verifyClientToken(authorization.token, server);

userId = tokenData.sub;
clusterId = server.agentToken === "" ? tokenData.clusterId : defaultClusterId;
} catch (error) {
Expand Down Expand Up @@ -82,29 +82,20 @@ export function handleClientPresenceSocket(req: IncomingMessage, socket: WebSock
return;
}

console.log("SERVER: client listening to user presence socket");
const presenceSockets = server.getPresenceSocketsForClusterId(clusterId);

setTimeout(function() {
sendPresenceData(socket, server, clusterId);
}, firstMessageDelay);
presenceSockets.push(socket);
socket.on("close", () => {
const index = presenceSockets.findIndex((presence) => presence === socket);

server.on("ClientConnected", () => {
sendPresenceData(socket, server, clusterId);
if (index !== -1) {
presenceSockets.splice(index, 1);
}
});

server.on("ClientDisconnected", () => {
sendPresenceData(socket, server, clusterId);
});
}

function sendPresenceData(socket: WebSocket, server: TunnelServer, clusterId: string) {
const agents = server.getAgentsForClusterId(clusterId);
console.log("SERVER: client listening to user presence socket");

socket.send(
JSON.stringify({
"presence" : {
"userIds": agents.flatMap(agent => agent.clients.map(client => client.userId))
}
})
);
setTimeout(function() {
server.sendPresenceData(socket, clusterId);
}, firstMessageDelay);
}
43 changes: 38 additions & 5 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,32 @@ import { EventEmitter } from "events";

export type ClusterId = string;
export const defaultClusterId: ClusterId = "default";
const eventEmitter = new EventEmitter();

export class TunnelServer {
export class TunnelServer extends EventEmitter {
private server?: HttpServer;
private ws?: Server;

public agentToken = "";
public idpPublicKey = "";
public tunnelAddress?: string;
public agents: Map<ClusterId, Agent[]> = new Map();
emit = eventEmitter.emit;
on = eventEmitter.on;
off = eventEmitter.off;
public presenceSockets: Map<ClusterId, WebSocket[]> = new Map();

constructor() {
super();

this.on("ClientConnected", (clusterId: string) => {
this.getPresenceSocketsForClusterId(clusterId).forEach((socket) => {
this.sendPresenceData(socket, clusterId);
});
});

this.on("ClientDisconnected", (clusterId: string) => {
this.getPresenceSocketsForClusterId(clusterId).forEach((socket) => {
this.sendPresenceData(socket, clusterId);
});
});
}

start(port = 8080, agentToken: string, idpPublicKey: string, tunnelAddress = process.env.TUNNEL_ADDRESS || ""): Promise<void> {
this.agentToken = agentToken;
Expand Down Expand Up @@ -61,6 +74,14 @@ export class TunnelServer {
return agents;
}

getPresenceSocketsForClusterId(clusterId: string) {
const sockets = this.presenceSockets.get(clusterId) || [];

if (!this.presenceSockets.has(clusterId)) this.presenceSockets.set(clusterId, sockets);

return sockets;
}

handleRequest(req: IncomingMessage, res: ServerResponse) {
if (!req.url) return;

Expand Down Expand Up @@ -126,4 +147,16 @@ export class TunnelServer {
});
}
}

sendPresenceData(socket: WebSocket, clusterId: string) {
const agents = this.getAgentsForClusterId(clusterId);

socket.send(
JSON.stringify({
"presence" : {
"userIds": agents.flatMap(agent => agent.clients.map(client => client.userId))
}
})
);
}
}

0 comments on commit c9d726d

Please sign in to comment.