-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSubscriptionBroker.ts
75 lines (65 loc) · 2.24 KB
/
SubscriptionBroker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import type { ProtocolUpdateMessage } from '@/architect'
import type { ServerWebSocket } from 'bun'
type Context = {
// [clientId, subId, ws]
subscribers: [number, number, ServerWebSocket<any>][]
messages: unknown[]
}
export default class SubscriptionBroker {
topics: Map<string, Context> = new Map()
private getOrCreateTopic(topic: string): Context {
let context = this.topics.get(topic)
if (!context) {
context = { subscribers: [], messages: [] }
this.topics.set(topic, context)
}
return context
}
subscribe(clientId: number, topic: string, id: number, ws: ServerWebSocket<any>) {
const context = this.getOrCreateTopic(topic)
context.subscribers.push([clientId, id, ws])
}
unsubscribe(clientId: number, topic: string, id: number) {
const context = this.getOrCreateTopic(topic)
context.subscribers = context.subscribers.filter(
([subClientId, subId, _]) => subClientId !== clientId || subId !== id
)
}
unsubscribeAll(clientId: number, topic: string) {
const context = this.getOrCreateTopic(topic)
context.subscribers = context.subscribers.filter(
([subClientId, _subId, _]) => subClientId !== clientId
)
}
unsubscribeClient(clientId: number) {
for (const context of this.topics.values()) {
context.subscribers = context.subscribers.filter(
([subClientId, _subId, _]) => subClientId !== clientId
)
}
}
enqueueForPublish(topic: string, message: unknown) {
const context = this.getOrCreateTopic(topic)
context.messages.push(message)
}
publishEnqueued(topic: string) {
const context = this.getOrCreateTopic(topic)
while (context.messages.length > 0) {
const message = context.messages.shift()!
for (const [_, id, ws] of context.subscribers) {
// TODO: pre-serialize everything but the ID
const update: ProtocolUpdateMessage = { type: 'update', id, data: message }
ws.send(JSON.stringify(update))
}
}
// prune subscribers that have disconnected
context.subscribers = context.subscribers.filter(
([_, _id, ws]) => ws.readyState === WebSocket.OPEN
)
}
publishAllEnqueued() {
for (const topic of this.topics.keys()) {
this.publishEnqueued(topic)
}
}
}