-
Notifications
You must be signed in to change notification settings - Fork 0
/
classes.py
113 lines (87 loc) · 3.09 KB
/
classes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
import json
import uuid
import pika
class Connection:
def __init__(self):
host = os.environ.get("RABBITMQ_HOST")
usuario = os.environ.get("RABBITMQ_USER")
senha = os.environ.get("RABBITMQ_PSSWD")
porta = os.environ.get("RABBITMQ_PORT")
credentials = pika.PlainCredentials(usuario, senha)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host, porta, "/", credentials)
)
self.channel = self.connection.channel()
class Consumer(Connection):
def __init__(self, queue, callback):
super().__init__()
self.queue = queue
self.callback = callback
self.channel.queue_declare(queue=self.queue)
def start_server(self):
self.channel.basic_consume(
queue=self.queue, on_message_callback=self.callback, auto_ack=True
)
print('[*] Iniciando consumo da queue "%s"...' % self.queue)
print("[*] Aguardando mensagens. Para sair pressione CTRL+C")
self.channel.start_consuming()
@staticmethod
def byte2str(data):
return data.decode("utf-8")
@staticmethod
def byte2dict(data):
data = Consumer.byte2str(data)
data = data.replace("'", '"')
return json.loads(data)
class Publisher(Connection):
def __init__(self, routing_key):
super().__init__()
self.routing_key = routing_key
self.channel.queue_declare(queue=self.routing_key)
def publish(self, body=""):
print('[*] Publicando mensagem na queue "%s"...' % self.routing_key)
if isinstance(body, dict):
body = str(body)
self.channel.basic_publish(
exchange="",
routing_key=self.routing_key,
body=body,
)
def config_rpc(self):
queue_info = self.channel.queue_declare(queue="", exclusive=True)
self.callback_queue = queue_info.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self._on_response,
auto_ack=True,
)
def _on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body.decode("utf-8")
def rpc_publish(self, body=""):
print('[*] Publicando mensagem na queue "%s"...' % self.routing_key)
if isinstance(body, dict):
body = str(body)
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange="",
routing_key=self.routing_key,
body=body,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
)
while self.response is None:
self.connection.process_data_events()
return self.response
@staticmethod
def byte2str(data):
return data.decode("utf-8")
@staticmethod
def byte2dict(data):
data = Consumer.byte2str(data)
data = data.replace("'", '"')
return json.loads(data)