-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathflask_prism.py
373 lines (280 loc) · 13.7 KB
/
flask_prism.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
from functools import wraps
import inspect
import json
from flask import Response, current_app, request
__all__ = ('Refract', 'Prism', 'ResponseMapper')
class Refract(Response):
STATUS_OK = 200
DEFAULT_MIMETYPE = 'text/json'
PRISM_VERSION_ATTRIBUTE = 'prism_version'
PRISM_MIMETYPE_ATTRIBUTE = 'prism_mimetype'
def __init__(self, objects, status=STATUS_OK, as_list=False, mimetype=None, version=None, headers=None):
super(Refract, self).__init__()
if isinstance(objects, list):
self.data_objects = objects
else:
self.data_objects = [objects]
self.as_list = as_list
# Store headers
if headers:
self.headers = headers
# Get mimetype from representations
mimetype_model_rep = self.get_mimetype_representation()
if mimetype_model_rep != None:
self.mimetype = mimetype_model_rep
else:
self.mimetype = 'text/json'
# If mimetype is defined in this response then override
if mimetype != None:
self.mimetype = mimetype
self.data = self.build_response()
if isinstance(status, (int,)):
self.status_code = status
else:
self.status = status
self.mimetype = Refract.DEFAULT_MIMETYPE if mimetype_model_rep == None else self.mimetype
def get_mimetype_representation(self):
for o in self.data_objects:
func = self.get_representation_builder(o)
if hasattr(func, Refract.PRISM_MIMETYPE_ATTRIBUTE):
return getattr(func, Refract.PRISM_MIMETYPE_ATTRIBUTE)
return None
def get_representation_builder(self, prism_object, soft_fail=False):
# Get class name of object
class_name = prism_object.__class__.__name__
# Determine if a blueprint is being used
if request.blueprint != None:
# Get BP from current app
bp = current_app.blueprints[request.blueprint]
# Get prism_version from BP if applicable
version = getattr(bp, Refract.PRISM_VERSION_ATTRIBUTE) \
if hasattr(bp, Refract.PRISM_VERSION_ATTRIBUTE) else None
else:
version = None
# Get representation
func = current_app.ext_prism.lookup_mappings(class_name, version=version)
if func is None:
if isinstance(prism_object, dict):
return prism_object
if not soft_fail:
raise Exception('Issue retrieving stored function reference for PRISM mapping on %s object. '
'Does this object have an api_response/is the right version defined?' % class_name)
return func
def get_representation_dict(self, prism_object):
# Get response from builder
if not isinstance(prism_object, dict):
resp = self.get_representation_builder(prism_object)(prism_object)
else:
resp = prism_object
# Look for has_ methods and evaluate
def evaluate_level(items):
# TODO:// Convert and combine, this if statement is poor.
if isinstance(items, dict):
for k, v in items.items():
if isinstance(v, (dict, list)):
# If a dict or a list pass through eval_level again for further processing
evaluate_level(v)
elif isinstance(v, ResponseEvaluator.Killer):
# If killer object, pop this key (I doubt this will ever be hit, but just for safety)
items.pop(k, 0)
elif isinstance(v, ResponseEvaluator):
# If a response evaluator, evaluate
new_val = v.evaluate_for_response(prism_object)
# If new_val is a Killer, pop this key and continue
if isinstance(new_val, ResponseEvaluator.Killer):
items.pop(k, 0)
continue
# If new_val is a list or dict, pass through eval_level again for further processing
if isinstance(new_val, (dict, list)):
evaluate_level(new_val)
items[k] = new_val
elif self.get_representation_builder(v, soft_fail=True) is not None:
new_val = self.get_representation_dict(v)
items[k] = new_val
elif isinstance(items, list):
for i, v in enumerate(items):
if isinstance(v, (dict, list)):
# If a dict or list pass through eval_level again for further processing
evaluate_level(v)
elif isinstance(v, ResponseEvaluator.Killer):
# If a killer object, remove this item from the list
items.remove(v)
elif isinstance(v, ResponseEvaluator):
# If it's a response evaluator, evaluate it
new_val = v.evaluate_for_response(prism_object)
# If new_val is a Killer, remove this value and continue
if isinstance(new_val, ResponseEvaluator.Killer):
items.remove(v)
continue
# If new_val is a list or dict, pass through eval_level again for further processing
if isinstance(new_val, (dict, list)):
evaluate_level(new_val)
items[i] = new_val
elif self.get_representation_builder(v, soft_fail=True) is not None:
new_val = self.get_representation_dict(v)
items[i] = new_val
return items
final = evaluate_level(resp)
return final
def build_response(self):
return_objects = {} if not self.as_list and self.data_objects.__len__() <= 1 else []
if self.data_objects.__len__() > 1:
for o in self.data_objects:
return_objects.append(self.get_representation_dict(o))
elif self.data_objects.__len__() == 1:
r = self.get_representation_dict(self.data_objects[0])
if self.as_list:
return_objects.append(r)
else:
return_objects = r
return json.dumps(return_objects)
class Prism(object):
def __init__(self, app=None):
self.app = app
self.mapper = ResponseMapper()
if app is not None:
app.ext_prism = self
def init_app(self, app):
self.app = app
app.ext_prism = self
def get_calling_class_name(self):
stack = inspect.stack()
the_class = str(stack[2][0].f_locals["self"].__class__).split('.')[1]
return the_class
def has_or_none(self, key, value, version=None):
r = ResponseEvaluator(self, '%s' % (self.get_calling_class_name()), key, ResponseEvaluator.MODE_NONE, value,
version=version)
return r
def has_or_exclude(self, key, value, version=None):
r = ResponseEvaluator(self, self.get_calling_class_name(), key, ResponseEvaluator.MODE_EXCLUDE, value,
version=version)
return r
def has_or_else(self, key, value, else_value, version=None):
r = ResponseEvaluator(self, self.get_calling_class_name(), key, ResponseEvaluator.MODE_ELSE, value,
version=version)
r.alternative = else_value
return r
def api_representation(self, version=None, mimetype=None):
"""
:param version: The version of this representation as an integer
:param mimetype: The final mimetype of this response, default is text/json
"""
def func_decorator(func):
@wraps(func)
def process(*args, **kwargs):
return func(*args, **kwargs)
# Determine if method was used in a class or not
frames = inspect.stack()
defined_in_class = False
first_statment = ''
if len(frames) > 2:
maybe_class_frame = frames[2]
statement_list = maybe_class_frame[4]
first_statment = statement_list[0]
if first_statment.strip().startswith('class '):
defined_in_class = True
if not defined_in_class:
raise Exception('PRISM representation methods must be defined in a class.')
# Get class name for use
class_name = first_statment.replace('class ', '').split('(')[0]
# Store mimetype to function
func.prism_mimetype = mimetype
# Map the method to a format we know about
self.mapper.map('%s/%s/rep/%s' % (class_name.strip('\n:'), version, func.__name__), func)
return process
return func_decorator
def has_access(self, version=None):
def func_decorator(func):
@wraps(func)
def process(*args, **kwargs):
return func(*args, **kwargs)
# FIXME:// Is having two copies of this code required? It could easily be moved into a method.
# Determine if method was used in a class or not
frames = inspect.stack()
defined_in_class = False
first_statment = ''
if len(frames) > 2:
maybe_class_frame = frames[2]
statement_list = maybe_class_frame[4]
first_statment = statement_list[0]
if first_statment.strip().startswith('class '):
defined_in_class = True
if not defined_in_class:
raise Exception('PRISM access methods must be defined in a class.')
# Get class name for use
class_name = first_statment.replace('class ', '').split('(')[0]
# Map the method to a format we know about
self.mapper.map('%s/%s/acc/%s' % (class_name, version, func.__name__), func)
return process
return func_decorator
def check_has_access(self, instance, access_reference, access_key, version=None):
# Get the relevant access method
func = self.lookup_mappings(class_name=access_reference, version=version, type='acc')
# If result is none, raise exception
if func is None:
raise Exception('Mapping not found for class %s, version %s, would have used access key %s' % (
access_reference, version, access_key))
# Get the result from method
result = func(instance, access_key)
# If the user wrote a function that doesn't return a boolean, the returned value is useless to us. Fail now.
if not isinstance(result, bool):
raise Exception('PRISM issue checking for access, expected boolean but got %s' % type(result))
# Return the result
return result
def __get_mapper(self):
return self.mapper
def lookup_mappings(self, class_name, version=None, type='rep'):
return self.mapper.return_for(class_name=class_name, version=version, type=type)
class ResponseEvaluator(object):
"""
An instance of this class is inserted in place of values when using prisms has_ methods. It is evaluated on the way
out and the value replaced.
"""
MODE_NONE = 0
MODE_EXCLUDE = 1
MODE_ELSE = 2
def __init__(self, prism, access_reference, access_key, mode, value, version=None):
self.mode = mode
self.prism = prism
self.access_reference = access_reference
self.access_key = access_key
self.value = value
self.version = version
self.alternative = None
def get_alternative(self):
# Get alternative if mode is None
if self.mode == ResponseEvaluator.MODE_NONE:
return None
# Get alternative if mode is Exclude
elif self.mode == ResponseEvaluator.MODE_EXCLUDE:
return ResponseEvaluator.Killer()
# Get alternative if mode is Else
elif self.mode == ResponseEvaluator.MODE_ELSE:
return self.alternative
# Raise exception if unknown constant
else:
raise Exception("Unrecognised mode for Response evaluator. Expected known constant, given %s" % self.mode)
def evaluate_for_response(self, instance):
# Return the positive value if has_access check is passed, else the alternative
return self.value \
if self.prism.check_has_access(instance, self.access_reference, self.access_key, version=self.version) \
else self.get_alternative()
# Shelf class to know if we should kill off this value or key/value
class Killer(object):
pass
class ResponseMapper(object):
"""
This class maintains references to representations for objects.
"""
def __init__(self):
self.maps = {}
def map(self, key, response):
if key in self.maps.keys():
raise Exception('Map key "%s" overwrites existing mapping. Try renaming the new method and try again.' %
key)
self.maps[key] = response
def return_for(self, class_name, version, type):
for key, function in self.maps.items():
if key.startswith('%s/%s/%s' % (class_name, version, type)):
return function
return None