-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsocket.py
92 lines (76 loc) · 3.16 KB
/
socket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#
# Copyright © 2023 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# SPDX-License-Identifier: Apache-2.0
#
"""Protocol handler for GCN socket connection."""
import asyncio
import logging
import struct
import confluent_kafka
import gcn
import lxml.etree
from . import metrics
from .common import notice_type_int_to_str, topic_for_notice_type_str
log = logging.getLogger(__name__)
bin_len = 160
int4 = struct.Struct("!l")
ignore_notice_types = {
gcn.NoticeType.IM_ALIVE,
gcn.NoticeType.VOE_11_IM_ALIVE,
gcn.NoticeType.VOE_20_IM_ALIVE,
}
def client_connected(producer: confluent_kafka.Producer, timeout: float = 90):
async def client_connected_cb(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
async def read():
bin_data = await reader.readexactly(bin_len)
(voe_len,) = int4.unpack(await reader.readexactly(int4.size))
voe_data = await reader.readexactly(voe_len)
(txt_len,) = int4.unpack(await reader.readexactly(int4.size))
txt_data = await reader.readexactly(txt_len)
log.debug("Read %d + %d + %d bytes", bin_len, voe_len, txt_len)
return bin_data, voe_data, txt_data
async def process():
bin_data, voe_data, txt_data = await asyncio.wait_for(read(), timeout)
metrics.iamalive.inc()
(bin_notice_type,) = int4.unpack_from(bin_data)
log.info("Received notice of type 0x%08X", bin_notice_type)
if bin_notice_type in ignore_notice_types:
return
voe = lxml.etree.fromstring(voe_data)
voe_notice_type = gcn.handlers.get_notice_type(voe)
if bin_notice_type != voe_notice_type:
log.warning(
"Binary (0x%08X) and VOEvent (0x%08X) notice types differ",
bin_notice_type,
voe_notice_type,
)
# The text notices do not contain a machine-readable notice type.
txt_notice_type = bin_notice_type
for notice_type_int, data, flavor in [
[bin_notice_type, bin_data, "binary"],
[voe_notice_type, voe_data, "voevent"],
[txt_notice_type, txt_data, "text"],
]:
notice_type_str = notice_type_int_to_str(notice_type_int)
metrics.received.labels(notice_type_int, notice_type_str, flavor).inc()
topic = topic_for_notice_type_str(notice_type_str, flavor)
producer.produce(topic, data)
# Wait for any outstanding messages to be delivered and delivery
# report callbacks to be triggered.
producer.poll(0)
peer, *_ = writer.get_extra_info("peername")
log.info("Client connected from %s", peer)
try:
with metrics.connected.track_inprogress():
while True:
await process()
finally:
log.info("Closing connection from %s", peer)
writer.close()
await writer.wait_closed()
return client_connected_cb