This repository has been archived by the owner on Oct 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 54
/
Copy pathcommon.py
347 lines (298 loc) · 11.8 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals
import datetime
import calendar
import json
import six
from uamqp import Message, BatchMessage
from uamqp import types, constants, errors
from uamqp.message import MessageHeader, MessageProperties
_NO_RETRY_ERRORS = (
b"com.microsoft:argument-out-of-range",
b"com.microsoft:entity-disabled",
b"com.microsoft:auth-failed",
b"com.microsoft:precondition-failed",
b"com.microsoft:argument-error"
)
def _error_handler(error):
"""
Called internally when an event has failed to send so we
can parse the error to determine whether we should attempt
to retry sending the event again.
Returns the action to take according to error type.
:param error: The error received in the send attempt.
:type error: Exception
:rtype: ~uamqp.errors.ErrorAction
"""
if error.condition == b'com.microsoft:server-busy':
return errors.ErrorAction(retry=True, backoff=4)
if error.condition == b'com.microsoft:timeout':
return errors.ErrorAction(retry=True, backoff=2)
if error.condition == b'com.microsoft:operation-cancelled':
return errors.ErrorAction(retry=True)
if error.condition == b"com.microsoft:container-close":
return errors.ErrorAction(retry=True, backoff=4)
if error.condition in _NO_RETRY_ERRORS:
return errors.ErrorAction(retry=False)
return errors.ErrorAction(retry=True)
def parse_sas_token(sas_token):
"""Parse a SAS token into its components.
:param sas_token: The SAS token.
:type sas_token: str
:rtype: dict[str, str]
"""
sas_data = {}
token = sas_token.partition(' ')[2]
fields = token.split('&')
for field in fields:
key, value = field.split('=', 1)
sas_data[key.lower()] = value
return sas_data
class EventData(object):
"""
The EventData class is a holder of event content.
Acts as a wrapper to an uamqp.message.Message object.
"""
PROP_SEQ_NUMBER = b"x-opt-sequence-number"
PROP_OFFSET = b"x-opt-offset"
PROP_PARTITION_KEY = b"x-opt-partition-key"
PROP_TIMESTAMP = b"x-opt-enqueued-time"
PROP_DEVICE_ID = b"iothub-connection-device-id"
def __init__(self, body=None, batch=None, to_device=None, message=None):
"""
Initialize EventData.
:param body: The data to send in a single message.
:type body: str, bytes or list
:param batch: A data generator to send batched messages.
:type batch: Generator
:param to_device: An IoT device to route to.
:type to_device: str
:param message: The received message.
:type message: ~uamqp.message.Message
"""
self._partition_key = types.AMQPSymbol(EventData.PROP_PARTITION_KEY)
self._annotations = {}
self._app_properties = {}
self.msg_properties = MessageProperties()
if to_device:
self.msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device)
if batch:
self.message = BatchMessage(data=batch, multi_messages=True, properties=self.msg_properties)
elif message:
self.message = message
self.msg_properties = message.properties
self._annotations = message.annotations
self._app_properties = message.application_properties
else:
if isinstance(body, list) and body:
self.message = Message(body[0], properties=self.msg_properties)
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("EventData cannot be None.")
else:
self.message = Message(body, properties=self.msg_properties)
@property
def sequence_number(self):
"""
The sequence number of the event data object.
:rtype: int or long
"""
return self._annotations.get(EventData.PROP_SEQ_NUMBER, None)
@property
def offset(self):
"""
The offset of the event data object.
:rtype: ~azure.eventhub.common.Offset
"""
try:
return Offset(self._annotations[EventData.PROP_OFFSET].decode('UTF-8'))
except (KeyError, AttributeError):
return None
@property
def enqueued_time(self):
"""
The enqueued timestamp of the event data object.
:rtype: datetime.datetime
"""
timestamp = self._annotations.get(EventData.PROP_TIMESTAMP, None)
if timestamp:
return datetime.datetime.utcfromtimestamp(float(timestamp)/1000)
return None
@property
def device_id(self):
"""
The device ID of the event data object. This is only used for
IoT Hub implementations.
:rtype: bytes
"""
return self._annotations.get(EventData.PROP_DEVICE_ID, None)
@property
def partition_key(self):
"""
The partition key of the event data object.
:rtype: bytes
"""
try:
return self._annotations[self._partition_key]
except KeyError:
return self._annotations.get(EventData.PROP_PARTITION_KEY, None)
@partition_key.setter
def partition_key(self, value):
"""
Set the partition key of the event data object.
:param value: The partition key to set.
:type value: str or bytes
"""
annotations = dict(self._annotations)
annotations[self._partition_key] = value
header = MessageHeader()
header.durable = True
self.message.annotations = annotations
self.message.header = header
self._annotations = annotations
@property
def application_properties(self):
"""
Application defined properties on the message.
:rtype: dict
"""
return self._app_properties
@application_properties.setter
def application_properties(self, value):
"""
Application defined properties on the message.
:param value: The application properties for the EventData.
:type value: dict
"""
self._app_properties = value
properties = dict(self._app_properties)
self.message.application_properties = properties
@property
def body(self):
"""
The body of the event data object.
:rtype: bytes or Generator[bytes]
"""
try:
return self.message.get_data()
except TypeError:
raise ValueError("Message data empty.")
def body_as_str(self, encoding='UTF-8'):
"""
The body of the event data as a string if the data is of a
compatible type.
:param encoding: The encoding to use for decoding message data.
Default is 'UTF-8'
:rtype: str or unicode
"""
data = self.body
try:
return "".join(b.decode(encoding) for b in data)
except TypeError:
return six.text_type(data)
except: # pylint: disable=bare-except
pass
try:
return data.decode(encoding)
except Exception as e:
raise TypeError("Message data is not compatible with string type: {}".format(e))
def body_as_json(self, encoding='UTF-8'):
"""
The body of the event loaded as a JSON object is the data is compatible.
:param encoding: The encoding to use for decoding message data.
Default is 'UTF-8'
:rtype: dict
"""
data_str = self.body_as_str(encoding=encoding)
try:
return json.loads(data_str)
except Exception as e:
raise TypeError("Event data is not compatible with JSON type: {}".format(e))
class Offset(object):
"""
The offset (position or timestamp) where a receiver starts. Examples:
Beginning of the event stream:
>>> offset = Offset("-1")
End of the event stream:
>>> offset = Offset("@latest")
Events after the specified offset:
>>> offset = Offset("12345")
Events from the specified offset:
>>> offset = Offset("12345", True)
Events after a datetime:
>>> offset = Offset(datetime.datetime.utcnow())
Events after a specific sequence number:
>>> offset = Offset(1506968696002)
"""
def __init__(self, value, inclusive=False):
"""
Initialize Offset.
:param value: The offset value.
:type value: ~datetime.datetime or int or str
:param inclusive: Whether to include the supplied value as the start point.
:type inclusive: bool
"""
self.value = value
self.inclusive = inclusive
def selector(self):
"""
Creates a selector expression of the offset.
:rtype: bytes
"""
operator = ">=" if self.inclusive else ">"
if isinstance(self.value, datetime.datetime):
timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
if isinstance(self.value, six.integer_types):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')
class EventHubError(Exception):
"""
Represents an error happened in the client.
:ivar message: The error message.
:vartype message: str
:ivar error: The error condition, if available.
:vartype error: str
:ivar details: The error details, if included in the
service response.
:vartype details: dict[str, str]
"""
def __init__(self, message, details=None):
self.error = None
self.message = message
self.details = details
if isinstance(message, constants.MessageSendResult):
self.message = "Message send failed with result: {}".format(message)
if details and isinstance(details, Exception):
try:
condition = details.condition.value.decode('UTF-8')
except AttributeError:
condition = details.condition.decode('UTF-8')
_, _, self.error = condition.partition(':')
self.message += "\nError: {}".format(self.error)
try:
self._parse_error(details.description)
for detail in self.details:
self.message += "\n{}".format(detail)
except: # pylint: disable=bare-except
self.message += "\n{}".format(details)
super(EventHubError, self).__init__(self.message)
def _parse_error(self, error_list):
details = []
self.message = error_list if isinstance(error_list, six.text_type) else error_list.decode('UTF-8')
details_index = self.message.find(" Reference:")
if details_index >= 0:
details_msg = self.message[details_index + 1:]
self.message = self.message[0:details_index]
tracking_index = details_msg.index(", TrackingId:")
system_index = details_msg.index(", SystemTracker:")
timestamp_index = details_msg.index(", Timestamp:")
details.append(details_msg[:tracking_index])
details.append(details_msg[tracking_index + 2: system_index])
details.append(details_msg[system_index + 2: timestamp_index])
details.append(details_msg[timestamp_index + 2:])
self.details = details