forked from narusemotoki/parsely
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtests.py
297 lines (224 loc) · 10.3 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import json
import logging
import os
import pkg_resources
import sqlite3
import unittest.mock
import celery
import falcon
import pytest
import parsely
import parsely.database
import parsely.retry
# We don't need actual celery for testing.
celery.Celery = unittest.mock.MagicMock()
parsely.database.db.dbname = 'test.db'
# Disable logger
parsely.logger.setLevel(logging.CRITICAL)
def task_for_test(text: str, number: int):
pass
class TestParsely:
def setup_method(self, method):
self.parsely = parsely.Parsely('test_queue', 'test_broker')
def test_task(self):
self.parsely.task()(task_for_test)
(processor, validations), preprocessors = self.parsely._tasks['task_for_test']
assert len(preprocessors) == 0
for validation, expect in zip(
sorted(validations, key=lambda x: x[0]),
[('number', int), ('text', str)]
):
assert validation == expect
def test_register_same_task(self):
self.parsely.task()(task_for_test)
with pytest.raises(parsely.ParselyError):
self.parsely.task()(task_for_test)
def test_queue_name_startw_with__(self):
with pytest.raises(parsely.ParselyError):
parsely.Parsely('_queue', 'test_broker')
def test_retry(self):
def task_for_retry():
raise Exception
with unittest.mock.patch.object(self.parsely, 'celery') as mock_celery:
def mock_task(handle, bind):
self.handle = handle
mock_celery.task.side_effect = mock_task
self.parsely.task(retry_policy=parsely.retry.FibonacciWait(1))(task_for_retry)
mock_celery_task = unittest.mock.MagicMock()
self.handle(mock_celery_task)
assert mock_celery_task.retry.called
def test_no_retry(self):
def task_for_retry():
raise Exception
with unittest.mock.patch.object(self.parsely, 'celery') as mock_celery:
def mock_task(handle, bind):
self.handle = handle
mock_celery.task.side_effect = mock_task
self.parsely.task()(task_for_retry)
mock_celery_task = unittest.mock.MagicMock()
with pytest.raises(Exception):
self.handle(mock_celery_task)
assert not mock_celery_task.retry.called
def test_make_producer(self):
assert isinstance(self.parsely.make_producer(), falcon.api.API)
class TestProducer:
def setup_method(self, method):
self.parsely = parsely.Parsely('test_queue', 'test_broker')
self.parsely.task()(task_for_test)
self.producer = parsely.Producer(
parsely.HTMLRendler(), True, 'test_queue', self.parsely._tasks)
self.mock_req = unittest.mock.MagicMock()
self.mock_resp = unittest.mock.MagicMock()
parsely.database.Migrator(parsely.__version__).migrate()
parsely.database.db.reconnect()
def teardown_method(self, method):
parsely.database.db.get().close()
if os.path.exists('test.db'):
os.remove('test.db')
def test_undefined_task(self):
with pytest.raises(falcon.HTTPBadRequest) as e:
self.producer.on_post(self.mock_req, self.mock_resp, 'undefined_task')
assert e.value.title == "Undefined task"
def test_empty_payload(self):
self.mock_req.stream.read.return_value = b""
with pytest.raises(falcon.HTTPBadRequest) as e:
self.producer.on_post(self.mock_req, self.mock_resp, 'task_for_test')
assert e.value.title == "Empty payload"
def test_non_json_payload(self):
self.mock_req.stream.read.return_value = b"This is not a JSON"
with pytest.raises(falcon.HTTPBadRequest) as e:
self.producer.on_post(self.mock_req, self.mock_resp, 'task_for_test')
assert e.value.title == "Payload is not a JSON"
def test_lack_message(self):
self.mock_req.stream.read.return_value = b"{}"
with pytest.raises(falcon.HTTPBadRequest) as e:
self.producer.on_post(self.mock_req, self.mock_resp, 'task_for_test')
assert e.value.title == "Invalid JSON"
def test_preprocessor_lack_requirements(self):
def preprocessor_for_preprocessor_test(text: str):
pass
@self.parsely.task(preprocessor_for_preprocessor_test)
def task_for_preprocessor_test():
pass
self.mock_req.stream.read.return_value = json.dumps({
'message': {}
}).encode()
with pytest.raises(falcon.HTTPBadRequest) as e:
self.producer.on_post(
self.mock_req, self.mock_resp, 'task_for_preprocessor_test')
assert e.value.title == "Missing required filed"
def test_preprocessor_invalid_type(self):
def preprocessor_for_preprocessor_test(text: str):
pass
@self.parsely.task(preprocessor_for_preprocessor_test)
def task_for_preprocessor_test():
pass
self.mock_req.stream.read.return_value = json.dumps({
'message': {
'text': 1
}
}).encode()
with pytest.raises(falcon.HTTPBadRequest) as e:
self.producer.on_post(
self.mock_req, self.mock_resp, 'task_for_preprocessor_test')
assert e.value.title == "Invalid type"
def test_task(self):
def preprocessor_for_preprocessor_test(number: int):
return {
'text': str(number)
}
@self.parsely.task(preprocessor_for_preprocessor_test)
def task_for_preprocessor_test(text: str):
pass
self.mock_req.stream.read.return_value = json.dumps({
'message': {
'number': 1
}
}).encode()
self.producer.on_post(
self.mock_req, self.mock_resp, 'task_for_preprocessor_test')
assert self.parsely._tasks['task_for_preprocessor_test'][0][0].apply_async.called
def test_on_get(self):
self.producer.on_get(self.mock_req, self.mock_resp, 'task_for_test')
self.mock_resp.content_type = 'text/html'
class TestStaticResource:
@unittest.mock.patch.object(pkg_resources, "resource_filename")
def test_on_get_for_installed(self, mock_resource_filename):
resourcename = "parsely.js"
mock_resource_filename.return_value = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "parsely", "resources", resourcename
)
static_resource = parsely.StaticResource()
static_resource.is_packaged = True
mock_resp = unittest.mock.MagicMock()
static_resource.on_get(unittest.mock.MagicMock(), mock_resp, resourcename)
mock_resp.content_type = "application/javascript"
def test_on_get_for_not_installed(self):
static_resource = parsely.StaticResource()
static_resource.is_packaged = False
mock_resp = unittest.mock.MagicMock()
static_resource.on_get(unittest.mock.MagicMock(), mock_resp, "parsely.js")
mock_resp.content_type = "application/javascript"
class TestMigrator:
def teardown_method(self, method):
parsely.database.db.get().close()
if os.path.exists('test.db'):
os.remove('test.db')
def test__raise_for_invalid_version(self):
parsely.database.Migrator(parsely.__version__).migrate()
with pytest.raises(parsely.ParselyError):
parsely.database.Migrator('0').migrate()
def test__run_migration_sql_file(self):
migrator = parsely.database.Migrator(parsely.__version__)
migrator._iter_diff = lambda x: [os.path.join("test_resources", "invalid.sql")]
with pytest.raises(parsely.ParselyError):
migrator.migrate()
class TestDBManager:
def setup_method(self, method):
self.mock_connection_manager = unittest.mock.MagicMock()
self.db_manager = parsely.DBManager(self.mock_connection_manager)
def test_process_resource(self):
self.db_manager.process_resource(
unittest.mock.MagicMock(), unittest.mock.MagicMock(), unittest.mock.MagicMock(),
unittest.mock.MagicMock()
)
assert self.mock_connection_manager.reconnect.called
def test_process_response_without_connection(self):
self.mock_connection_manager.get = unittest.mock.MagicMock(return_value=None)
self.db_manager.process_response(
unittest.mock.MagicMock(), unittest.mock.MagicMock(), unittest.mock.MagicMock(), True
)
assert not self.mock_connection_manager.commit.called
assert not self.mock_connection_manager.rollback.called
def test_process_response_succeeded(self):
self.db_manager.process_response(
unittest.mock.MagicMock(), unittest.mock.MagicMock(), unittest.mock.MagicMock(), True
)
assert self.mock_connection_manager.get.return_value.commit.called
assert not self.mock_connection_manager.get.return_value.rollback.called
def test_process_response_failed(self):
self.db_manager.process_response(
unittest.mock.MagicMock(), unittest.mock.MagicMock(), unittest.mock.MagicMock(), False
)
assert not self.mock_connection_manager.get.return_value.commit.called
assert self.mock_connection_manager.get.return_value.rollback.called
def test_process_response_failed_with_closed_connection(self):
self.mock_connection_manager.get.return_value.rollback = unittest.mock.MagicMock(
side_effect=sqlite3.ProgrammingError)
self.db_manager.process_response(
unittest.mock.MagicMock(), unittest.mock.MagicMock(), unittest.mock.MagicMock(), False
)
class TestListTaskResource:
def setup_method(self, method):
self.resource = parsely.TaskListResource(parsely.HTMLRendler(), 'test', {})
def test_on_get(self):
mock_resp = unittest.mock.MagicMock()
self.resource.on_get(unittest.mock.MagicMock(), mock_resp)
assert mock_resp.content_type == 'text/html'
def test_fibonacci_wait():
retry_count = 10
fibonacci_wait = parsely.retry.FibonacciWait(retry_count)
assert fibonacci_wait.max_retries == retry_count
assert fibonacci_wait.retry_method == parsely.retry.RetryMethod.countdown
for i, expect in enumerate([1, 2, 3, 5, 8, 13, 21, 34, 55, 89]):
assert fibonacci_wait.countdown(i, None) == expect