Skip to content

Commit

Permalink
feat: Implement token callback; fix CI testing (#439)
Browse files Browse the repository at this point in the history
setAuth will automatically use a accessToken callback to fetch the token at the right time instead of assuming that the right token is present

---------

Co-authored-by: Kamil Ogórek <kamil.ogorek@gmail.com>
  • Loading branch information
filipecabaco and kamilogorek authored Dec 3, 2024
1 parent 9c8a5d3 commit e70fa7e
Show file tree
Hide file tree
Showing 7 changed files with 465 additions and 688 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"jsdom": "^16.7.0",
"jsdom-global": "3.0.0",
"jsonwebtoken": "^9.0.2",
"mock-socket": "^9.0.3",
"mock-socket": "^9.3.1",
"npm-run-all": "^4.1.5",
"nyc": "^15.1.0",
"prettier": "^2.1.2",
Expand Down
148 changes: 70 additions & 78 deletions src/RealtimeChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ export enum REALTIME_SUBSCRIBE_STATES {

export const REALTIME_CHANNEL_STATES = CHANNEL_STATES

interface PostgresChangesFilters {
postgres_changes: {
id: string
event: string
schema?: string
table?: string
filter?: string
}[]
}
/** A channel is the basic building block of Realtime
* and narrows the scope of data flow to subscribed clients.
* You can think of a channel as a chatroom where participants are able to see who's online
Expand Down Expand Up @@ -202,21 +211,23 @@ export default class RealtimeChannel {

/** Subscribe registers your client with the server */
subscribe(
callback?: (status: `${REALTIME_SUBSCRIBE_STATES}`, err?: Error) => void,
callback?: (status: REALTIME_SUBSCRIBE_STATES, err?: Error) => void,
timeout = this.timeout
): RealtimeChannel {
if (!this.socket.isConnected()) {
this.socket.connect()
}

if (this.joinedOnce) {
throw `tried to subscribe multiple times. 'subscribe' can only be called a single time per channel instance`
} else {
const {
config: { broadcast, presence, private: isPrivate },
} = this.params
this._onError((e: Error) => callback && callback('CHANNEL_ERROR', e))
this._onClose(() => callback && callback('CLOSED'))

this._onError((e: Error) =>
callback?.(REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR, e)
)
this._onClose(() => callback?.(REALTIME_SUBSCRIBE_STATES.CLOSED))

const accessTokenPayload: { access_token?: string } = {}
const config = {
Expand All @@ -227,8 +238,8 @@ export default class RealtimeChannel {
private: isPrivate,
}

if (this.socket.accessToken) {
accessTokenPayload.access_token = this.socket.accessToken
if (this.socket.accessTokenValue) {
accessTokenPayload.access_token = this.socket.accessTokenValue
}

this.updateJoinPayload({ ...{ config }, ...accessTokenPayload })
Expand All @@ -237,85 +248,67 @@ export default class RealtimeChannel {
this._rejoin(timeout)

this.joinPush
.receive(
'ok',
({
postgres_changes: serverPostgresFilters,
}: {
postgres_changes: {
id: string
event: string
schema?: string
table?: string
filter?: string
}[]
}) => {
this.socket.accessToken &&
this.socket.setAuth(this.socket.accessToken)

if (serverPostgresFilters === undefined) {
callback && callback('SUBSCRIBED')
return
} else {
const clientPostgresBindings = this.bindings.postgres_changes
const bindingsLen = clientPostgresBindings?.length ?? 0
const newPostgresBindings = []

for (let i = 0; i < bindingsLen; i++) {
const clientPostgresBinding = clientPostgresBindings[i]
const {
filter: { event, schema, table, filter },
} = clientPostgresBinding
const serverPostgresFilter =
serverPostgresFilters && serverPostgresFilters[i]

if (
serverPostgresFilter &&
serverPostgresFilter.event === event &&
serverPostgresFilter.schema === schema &&
serverPostgresFilter.table === table &&
serverPostgresFilter.filter === filter
) {
newPostgresBindings.push({
...clientPostgresBinding,
id: serverPostgresFilter.id,
})
} else {
this.unsubscribe()
callback &&
callback(
'CHANNEL_ERROR',
new Error(
'mismatch between server and client bindings for postgres changes'
)
)
return
}
.receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => {
this.socket.setAuth()
if (postgres_changes === undefined) {
callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
return
} else {
const clientPostgresBindings = this.bindings.postgres_changes
const bindingsLen = clientPostgresBindings?.length ?? 0
const newPostgresBindings = []

for (let i = 0; i < bindingsLen; i++) {
const clientPostgresBinding = clientPostgresBindings[i]
const {
filter: { event, schema, table, filter },
} = clientPostgresBinding
const serverPostgresFilter =
postgres_changes && postgres_changes[i]

if (
serverPostgresFilter &&
serverPostgresFilter.event === event &&
serverPostgresFilter.schema === schema &&
serverPostgresFilter.table === table &&
serverPostgresFilter.filter === filter
) {
newPostgresBindings.push({
...clientPostgresBinding,
id: serverPostgresFilter.id,
})
} else {
this.unsubscribe()
callback?.(
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
new Error(
'mismatch between server and client bindings for postgres changes'
)
)
return
}
}

this.bindings.postgres_changes = newPostgresBindings
this.bindings.postgres_changes = newPostgresBindings

callback && callback('SUBSCRIBED')
return
}
callback && callback(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
return
}
)
})
.receive('error', (error: { [key: string]: any }) => {
callback &&
callback(
'CHANNEL_ERROR',
new Error(
JSON.stringify(Object.values(error).join(', ') || 'error')
)
callback?.(
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
new Error(
JSON.stringify(Object.values(error).join(', ') || 'error')
)
)
return
})
.receive('timeout', () => {
callback && callback('TIMED_OUT')
callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT)
return
})
}

return this
}

Expand Down Expand Up @@ -445,12 +438,13 @@ export default class RealtimeChannel {
): Promise<RealtimeChannelSendResponse> {
if (!this._canPush() && args.type === 'broadcast') {
const { event, payload: endpoint_payload } = args
const authorization = this.socket.accessTokenValue
? `Bearer ${this.socket.accessTokenValue}`
: ''
const options = {
method: 'POST',
headers: {
Authorization: this.socket.accessToken
? `Bearer ${this.socket.accessToken}`
: '',
Authorization: authorization,
apikey: this.socket.apiKey ? this.socket.apiKey : '',
'Content-Type': 'application/json',
},
Expand Down Expand Up @@ -523,7 +517,6 @@ export default class RealtimeChannel {

return new Promise((resolve) => {
const leavePush = new Push(this, CHANNEL_EVENTS.leave, {}, timeout)

leavePush
.receive('ok', () => {
onClose()
Expand All @@ -538,7 +531,6 @@ export default class RealtimeChannel {
})

leavePush.send()

if (!this._canPush()) {
leavePush.trigger('ok', {})
}
Expand Down
Loading

0 comments on commit e70fa7e

Please sign in to comment.