-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy path_conn.jsy
106 lines (83 loc) · 3.29 KB
/
_conn.jsy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import { _interval } from './_utils.jsy'
const _defer_obj = o => @
o.p = new Promise @ (a,e) => { o.a=a; o.e=e }
, o
function _dfn_reset(client, attr, fn_after) ::
// a resetable deferred for a function
let self = {set}, afn = async (...args) => (await self.p)(...args)
return set()
function set() ::
if afn !== client[attr] ::
_defer_obj(self).p.then(fn_after, _=>0)
client[attr] = afn
return self
export function _mqtt_conn(opt, client, [on_mqtt, pkt_future]) ::
let _abort
let _dfn_send0 = _dfn_reset @ client, '_send0', // client._send0 getter/setter
_=> client.conn_emit('on_live', conn.has_connected)
let _dfn_ready = _dfn_reset @ client, '_send', // client._send getter/setter
_=> client.conn_emit('on_ready')
let _keep_alive_ival = _interval @=> client._send0('pingreq') // resettable interval for keep_alive ping
let conn = Object.create @:
ping: (td=conn.keep_alive) => _keep_alive_ival(td)
on_conn(pkt, response) ::
conn.has_connected = true
conn.keep_alive = opt.keep_alive || response[0].props?.server_keep_alive || pkt.keep_alive
client.conn_emit('on_conn')
return opt.use_auth
? response // wait on enhanced authentication step
: conn.on_auth(null, response) // otherwise, connect is also auth
on_auth(pkt, response) ::
_dfn_ready.a(_dfn_send0.p)
if 0 != opt.keep_alive ::
conn.ping()
client.conn_emit('on_auth', !pkt)
return response
on_dis(pkt, response) ::
conn.reset(false)
return response
reset(err) ::
if err ::
_dfn_send0.e(err) // send error to uses of _send0 (connect, auth)
_abort.e(err) // abort in-progress connections
delete conn.is_set
conn.ready = handshake()
client.conn_emit('on_disconnect', false===err, err)
abort() ::
_dfn_ready.e(err) // abort all messages awaiting ready state
return conn.reset(err)
async setup(gate, send_u8_pkt, init_msg_loop) ::
if conn.is_set ::
throw new Error() // already in-progress
conn.is_set = true
await gate
// setup send/recv MQTT parsing context
let mqtt_ctx = client.mqtt_ctx.mqtt_stream()
:: // setup inbound message loop
let sess_ctx = {mqtt: client} // mutable session context
let on_mqtt_chunk = u8 => on_mqtt(mqtt_ctx.decode(u8), sess_ctx)
init_msg_loop(on_mqtt_chunk, conn)
// setup outbound message path and transport connection
send_u8_pkt = await send_u8_pkt
_dfn_send0.a @
async (type, pkt, key) => ::
let res = undefined !== key
? pkt_future(key) : true
await send_u8_pkt @
mqtt_ctx.encode_pkt(type, pkt)
return res
conn.ready = handshake()
return conn
async function handshake() ::
_abort = _defer_obj({})
_keep_alive_ival(0) // clearInterval on keep alive ping
_dfn_send0.set() // reset client._send0 if necessary
_dfn_ready.set() // reset client._send if necessary
try ::
// set client._send0 as passtrhough after transport connection
client._send0 = await Promise.race @# _dfn_send0.p, _abort.p
// set client._send as passtrhough after ready
client._send = await Promise.race @# _dfn_ready.p, _abort.p
return true
catch err ::
return false