@@ -55,6 +55,7 @@ class Job(metaclass=JobBase):
55
55
This is the core class for the package which is intended to be subclassed
56
56
to allow the caching behaviour to be customised.
57
57
"""
58
+
58
59
# All items are stored in memcache as a tuple (expiry, data). We don't use
59
60
# the TTL functionality within memcache but implement on own. If the
60
61
# expiry value is None, this indicates that there is already a job created
@@ -104,8 +105,9 @@ def class_path(self):
104
105
return '%s.%s' % (self .__module__ , self .__class__ .__name__ )
105
106
106
107
def __init__ (self ):
107
- self .cache_alias = (self .cache_alias or
108
- getattr (settings , 'CACHEBACK_CACHE_ALIAS' , DEFAULT_CACHE_ALIAS ))
108
+ self .cache_alias = self .cache_alias or getattr (
109
+ settings , 'CACHEBACK_CACHE_ALIAS' , DEFAULT_CACHE_ALIAS
110
+ )
109
111
self .cache = caches [self .cache_alias ]
110
112
self .task_options = self .task_options or {}
111
113
@@ -150,17 +152,25 @@ def get(self, *raw_args, **raw_kwargs):
150
152
# the fetch has finished, or
151
153
# b) trigger an async refresh and return an empty result
152
154
if self .should_missing_item_be_fetched_synchronously (* args , ** kwargs ):
153
- logger .debug (("Job %s with key '%s' - cache MISS - running "
154
- "synchronous refresh" ),
155
- self .class_path , key )
155
+ logger .debug (
156
+ ("Job %s with key '%s' - cache MISS - running " "synchronous refresh" ),
157
+ self .class_path ,
158
+ key ,
159
+ )
156
160
result = self .refresh (* args , ** kwargs )
157
161
return self .process_result (
158
- result , call = call , cache_status = self .MISS , sync_fetch = True )
162
+ result , call = call , cache_status = self .MISS , sync_fetch = True
163
+ )
159
164
160
165
else :
161
- logger .debug (("Job %s with key '%s' - cache MISS - triggering "
162
- "async refresh and returning empty result" ),
163
- self .class_path , key )
166
+ logger .debug (
167
+ (
168
+ "Job %s with key '%s' - cache MISS - triggering "
169
+ "async refresh and returning empty result"
170
+ ),
171
+ self .class_path ,
172
+ key ,
173
+ )
164
174
# To avoid cache hammering (ie lots of identical tasks
165
175
# to refresh the same cache item), we reset the cache with an
166
176
# empty result which will be returned until the cache is
@@ -169,8 +179,8 @@ def get(self, *raw_args, **raw_kwargs):
169
179
self .store (key , self .timeout (* args , ** kwargs ), result )
170
180
self .async_refresh (* args , ** kwargs )
171
181
return self .process_result (
172
- result , call = call , cache_status = self .MISS ,
173
- sync_fetch = False )
182
+ result , call = call , cache_status = self .MISS , sync_fetch = False
183
+ )
174
184
175
185
expiry , data = item
176
186
delta = time .time () - expiry
@@ -180,30 +190,35 @@ def get(self, *raw_args, **raw_kwargs):
180
190
# the fetch has finished, or
181
191
# b) trigger a refresh but allow the stale result to be
182
192
# returned this time. This is normally acceptable.
183
- if self .should_stale_item_be_fetched_synchronously (
184
- delta , * args , ** kwargs ):
193
+ if self .should_stale_item_be_fetched_synchronously (delta , * args , ** kwargs ):
185
194
logger .debug (
186
- ("Job %s with key '%s' - STALE cache hit - running "
187
- "synchronous refresh" ),
188
- self .class_path , key )
195
+ ("Job %s with key '%s' - STALE cache hit - running " "synchronous refresh" ),
196
+ self .class_path ,
197
+ key ,
198
+ )
189
199
result = self .refresh (* args , ** kwargs )
190
200
return self .process_result (
191
- result , call = call , cache_status = self .STALE ,
192
- sync_fetch = True )
201
+ result , call = call , cache_status = self .STALE , sync_fetch = True
202
+ )
193
203
194
204
else :
195
205
logger .debug (
196
- ("Job %s with key '%s' - STALE cache hit - triggering "
197
- "async refresh and returning stale result" ),
198
- self .class_path , key )
206
+ (
207
+ "Job %s with key '%s' - STALE cache hit - triggering "
208
+ "async refresh and returning stale result"
209
+ ),
210
+ self .class_path ,
211
+ key ,
212
+ )
199
213
# We replace the item in the cache with a 'timeout' expiry - this
200
214
# prevents cache hammering but guards against a 'limbo' situation
201
215
# where the refresh task fails for some reason.
202
216
timeout = self .timeout (* args , ** kwargs )
203
217
self .store (key , timeout , data )
204
218
self .async_refresh (* args , ** kwargs )
205
219
return self .process_result (
206
- data , call = call , cache_status = self .STALE , sync_fetch = False )
220
+ data , call = call , cache_status = self .STALE , sync_fetch = False
221
+ )
207
222
else :
208
223
logger .debug ("Job %s with key '%s' - cache HIT" , self .class_path , key )
209
224
return self .process_result (data , call = call , cache_status = self .HIT )
@@ -261,8 +276,14 @@ def set(self, *raw_args, **raw_kwargs):
261
276
262
277
expiry = self .expiry (* args , ** kwargs )
263
278
264
- logger .debug ("Setting %s cache with key '%s', args '%r', kwargs '%r', expiry '%r'" ,
265
- self .class_path , key , args , kwargs , expiry )
279
+ logger .debug (
280
+ "Setting %s cache with key '%s', args '%r', kwargs '%r', expiry '%r'" ,
281
+ self .class_path ,
282
+ key ,
283
+ args ,
284
+ kwargs ,
285
+ expiry ,
286
+ )
266
287
267
288
self .store (key , expiry , data )
268
289
@@ -292,9 +313,7 @@ def store(self, key, expiry, data):
292
313
# without warning.
293
314
__ , cached_data = self .cache .get (key , (None , None ))
294
315
if data is not None and cached_data is None :
295
- raise RuntimeError (
296
- "Unable to save data of type %s to cache" % (
297
- type (data )))
316
+ raise RuntimeError ("Unable to save data of type %s to cache" % (type (data )))
298
317
299
318
def refresh (self , * args , ** kwargs ):
300
319
"""
@@ -319,22 +338,24 @@ def async_refresh(self, *args, **kwargs):
319
338
obj_args = self .get_init_args (),
320
339
obj_kwargs = self .get_init_kwargs (),
321
340
call_args = args ,
322
- call_kwargs = kwargs
341
+ call_kwargs = kwargs ,
323
342
),
324
- task_options = self .task_options
343
+ task_options = self .task_options ,
325
344
)
326
345
except Exception :
327
346
# Handle exceptions from talking to RabbitMQ - eg connection
328
347
# refused. When this happens, we try to run the task
329
348
# synchronously.
330
- logger .error ("Unable to trigger task asynchronously - failing "
331
- "over to synchronous refresh" , exc_info = True )
349
+ logger .error (
350
+ "Unable to trigger task asynchronously - failing "
351
+ "over to synchronous refresh" ,
352
+ exc_info = True ,
353
+ )
332
354
try :
333
355
return self .refresh (* args , ** kwargs )
334
356
except Exception as e :
335
357
# Something went wrong while running the task
336
- logger .error ("Unable to refresh data synchronously: %s" , e ,
337
- exc_info = True )
358
+ logger .error ("Unable to refresh data synchronously: %s" , e , exc_info = True )
338
359
else :
339
360
logger .debug ("Failover synchronous refresh completed successfully" )
340
361
@@ -368,13 +389,14 @@ def should_missing_item_be_fetched_synchronously(self, *args, **kwargs):
368
389
369
390
def should_item_be_fetched_synchronously (self , * args , ** kwargs ):
370
391
import warnings
392
+
371
393
warnings .warn (
372
394
"The method 'should_item_be_fetched_synchronously' is deprecated "
373
395
"and will be removed in 0.5. Use "
374
396
"'should_missing_item_be_fetched_synchronously' instead." ,
375
- DeprecationWarning )
376
- return self . should_missing_item_be_fetched_synchronously (
377
- * args , ** kwargs )
397
+ DeprecationWarning ,
398
+ )
399
+ return self . should_missing_item_be_fetched_synchronously ( * args , ** kwargs )
378
400
379
401
def should_stale_item_be_fetched_synchronously (self , delta , * args , ** kwargs ):
380
402
"""
@@ -400,15 +422,18 @@ def key(self, *args, **kwargs):
400
422
# The line might break if your passed values are un-hashable. If
401
423
# it does, you need to override this method and implement your own
402
424
# key algorithm.
403
- return "%s:%s:%s:%s" % (self .class_path ,
404
- self .hash (args ),
405
- self .hash ([k for k in sorted (kwargs )]),
406
- self .hash ([kwargs [k ] for k in sorted (kwargs )]))
425
+ return "%s:%s:%s:%s" % (
426
+ self .class_path ,
427
+ self .hash (args ),
428
+ self .hash ([k for k in sorted (kwargs )]),
429
+ self .hash ([kwargs [k ] for k in sorted (kwargs )]),
430
+ )
407
431
except TypeError :
408
432
raise RuntimeError (
409
433
"Unable to generate cache key due to unhashable"
410
434
"args or kwargs - you need to implement your own"
411
- "key generation method to avoid this problem" )
435
+ "key generation method to avoid this problem"
436
+ )
412
437
413
438
def hash (self , value ):
414
439
"""
@@ -450,7 +475,7 @@ def process_result(self, result, call, cache_status, sync_fetch=None):
450
475
def job_refresh (cls , * args , ** kwargs ):
451
476
warnings .warn (
452
477
'`Job.job_refresh` is deprecated, use `perform_async_refresh` instead.' ,
453
- RemovedInCacheback13Warning
478
+ RemovedInCacheback13Warning ,
454
479
)
455
480
return cls .perform_async_refresh (* args , ** kwargs )
456
481
@@ -473,18 +498,21 @@ def perform_async_refresh(cls, klass_str, obj_args, obj_kwargs, call_args, call_
473
498
"""
474
499
klass = get_job_class (klass_str )
475
500
if klass is None :
476
- logger .error ("Unable to construct %s with args %r and kwargs %r" ,
477
- klass_str , obj_args , obj_kwargs )
501
+ logger .error (
502
+ "Unable to construct %s with args %r and kwargs %r" ,
503
+ klass_str ,
504
+ obj_args ,
505
+ obj_kwargs ,
506
+ )
478
507
return
479
508
480
- logger .info ("Using %s with constructor args %r and kwargs %r" ,
481
- klass_str , obj_args , obj_kwargs )
482
- logger . info ( "Calling refresh with args %r and kwargs %r" , call_args ,
483
- call_kwargs )
509
+ logger .info (
510
+ "Using %s with constructor args %r and kwargs %r" , klass_str , obj_args , obj_kwargs
511
+ )
512
+ logger . info ( "Calling refresh with args %r and kwargs %r" , call_args , call_kwargs )
484
513
start = time .time ()
485
514
try :
486
- klass (* obj_args , ** obj_kwargs ).refresh (
487
- * call_args , ** call_kwargs )
515
+ klass (* obj_args , ** obj_kwargs ).refresh (* call_args , ** call_kwargs )
488
516
except Exception as e :
489
517
logger .exception ("Error running job: '%s'" , e )
490
518
else :
0 commit comments