-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
Copy pathrange.py
executable file
·796 lines (652 loc) · 33.1 KB
/
range.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
# -*- coding: utf-8 -*-
# Copyright (c) 2014 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""
Produces contiguous completed ranges of recurring tasks.
See ``RangeDaily`` and ``RangeHourly`` for basic usage.
Caveat - if gaps accumulate, their causes (e.g. missing dependencies) going
unmonitored/unmitigated, then this will eventually keep retrying the same gaps
over and over and make no progress to more recent times. (See ``task_limit``
and ``reverse`` parameters.)
TODO foolproof against that kind of misuse?
"""
from collections import Counter
import itertools
import functools
import logging
import warnings
import re
import time
from datetime import datetime, timedelta, date
from dateutil.relativedelta import relativedelta
import luigi
from luigi.parameter import ParameterException
from luigi.target import FileSystemTarget
from luigi.task import Register, flatten_output
logger = logging.getLogger('luigi-interface')
class RangeEvent(luigi.Event): # Not sure if subclassing currently serves a purpose. Stringly typed, events are.
"""
Events communicating useful metrics.
``COMPLETE_COUNT`` would normally be nondecreasing, and its derivative
would describe performance (how many instances complete
invocation-over-invocation).
``COMPLETE_FRACTION`` reaching 1 would be a telling event in case of a
backfill with defined start and stop. Would not be strikingly useful for a
typical recurring task without stop defined, fluctuating close to 1.
``DELAY`` is measured from the first found missing datehour till (current
time + hours_forward), or till stop if it is defined. In hours for Hourly.
TBD different units for other frequencies?
TODO any different for reverse mode? From first missing till last missing?
From last gap till stop?
"""
COMPLETE_COUNT = "event.tools.range.complete.count"
COMPLETE_FRACTION = "event.tools.range.complete.fraction"
DELAY = "event.tools.range.delay"
class RangeBase(luigi.WrapperTask):
"""
Produces a contiguous completed range of a recurring task.
Made for the common use case where a task is parameterized by e.g.
``DateParameter``, and assurance is needed that any gaps arising from
downtime are eventually filled.
Emits events that one can use to monitor gaps and delays.
At least one of start and stop needs to be specified.
(This is quite an abstract base class for subclasses with different
datetime parameter classes, e.g. ``DateParameter``, ``DateHourParameter``,
..., and different parameter naming, e.g. days_back/forward,
hours_back/forward, ..., as well as different documentation wording,
to improve user experience.)
Subclasses will need to use the ``of`` parameter when overriding methods.
"""
# TODO lift the single parameter constraint by passing unknown parameters through WrapperTask?
of = luigi.TaskParameter(
description="task name to be completed. The task must take a single datetime parameter")
of_params = luigi.DictParameter(default=dict(), description="Arguments to be provided to the 'of' class when instantiating")
# The common parameters 'start' and 'stop' have type (e.g. DateParameter,
# DateHourParameter) dependent on the concrete subclass, cumbersome to
# define here generically without dark magic. Refer to the overrides.
start = luigi.Parameter()
stop = luigi.Parameter()
reverse = luigi.BoolParameter(
default=False,
description="specifies the preferred order for catching up. False - work from the oldest missing outputs onward; True - from the newest backward")
task_limit = luigi.IntParameter(
default=50,
description="how many of 'of' tasks to require. Guards against scheduling insane amounts of tasks in one go")
# TODO overridable exclude_datetimes or something...
now = luigi.IntParameter(
default=None,
description="set to override current time. In seconds since epoch")
param_name = luigi.Parameter(
default=None,
description="parameter name used to pass in parameterized value. Defaults to None, meaning use first positional parameter",
positional=False)
@property
def of_cls(self):
"""
DONT USE. Will be deleted soon. Use ``self.of``!
"""
if isinstance(self.of, str):
warnings.warn('When using Range programatically, dont pass "of" param as string!')
return Register.get_task_cls(self.of)
return self.of
# a bunch of datetime arithmetic building blocks that need to be provided in subclasses
def datetime_to_parameter(self, dt):
raise NotImplementedError
def parameter_to_datetime(self, p):
raise NotImplementedError
def datetime_to_parameters(self, dt):
"""
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
"""
raise NotImplementedError
def parameters_to_datetime(self, p):
"""
Given a dictionary of parameters, will extract the ranged task parameter value
"""
raise NotImplementedError
def moving_start(self, now):
"""
Returns a datetime from which to ensure contiguousness in the case when
start is None or unfeasibly far back.
"""
raise NotImplementedError
def moving_stop(self, now):
"""
Returns a datetime till which to ensure contiguousness in the case when
stop is None or unfeasibly far forward.
"""
raise NotImplementedError
def finite_datetimes(self, finite_start, finite_stop):
"""
Returns the individual datetimes in interval [finite_start, finite_stop)
for which task completeness should be required, as a sorted list.
"""
raise NotImplementedError
def _emit_metrics(self, missing_datetimes, finite_start, finite_stop):
"""
For consistent metrics one should consider the entire range, but
it is open (infinite) if stop or start is None.
Hence make do with metrics respective to the finite simplification.
"""
datetimes = self.finite_datetimes(
finite_start if self.start is None else min(finite_start, self.parameter_to_datetime(self.start)),
finite_stop if self.stop is None else max(finite_stop, self.parameter_to_datetime(self.stop)))
delay_in_jobs = len(datetimes) - datetimes.index(missing_datetimes[0]) if datetimes and missing_datetimes else 0
self.trigger_event(RangeEvent.DELAY, self.of_cls.task_family, delay_in_jobs)
expected_count = len(datetimes)
complete_count = expected_count - len(missing_datetimes)
self.trigger_event(RangeEvent.COMPLETE_COUNT, self.of_cls.task_family, complete_count)
self.trigger_event(RangeEvent.COMPLETE_FRACTION, self.of_cls.task_family, float(complete_count) / expected_count if expected_count else 1)
def _format_datetime(self, dt):
return self.datetime_to_parameter(dt)
def _format_range(self, datetimes):
param_first = self._format_datetime(datetimes[0])
param_last = self._format_datetime(datetimes[-1])
return '[%s, %s]' % (param_first, param_last)
def _instantiate_task_cls(self, param):
return self.of(**self._task_parameters(param))
@property
def _param_name(self):
if self.param_name is None:
return next(x[0] for x in self.of.get_params() if x[1].positional)
else:
return self.param_name
def _task_parameters(self, param):
kwargs = dict(**self.of_params)
kwargs[self._param_name] = param
return kwargs
def requires(self):
# cache because we anticipate a fair amount of computation
if hasattr(self, '_cached_requires'):
return self._cached_requires
if not self.start and not self.stop:
raise ParameterException("At least one of start and stop needs to be specified")
if not self.start and not self.reverse:
raise ParameterException("Either start needs to be specified or reverse needs to be True")
if self.start and self.stop and self.start > self.stop:
raise ParameterException("Can't have start > stop")
# TODO check overridden complete() and exists()
now = datetime.utcfromtimestamp(time.time() if self.now is None else self.now)
moving_start = self.moving_start(now)
finite_start = moving_start if self.start is None else max(self.parameter_to_datetime(self.start), moving_start)
moving_stop = self.moving_stop(now)
finite_stop = moving_stop if self.stop is None else min(self.parameter_to_datetime(self.stop), moving_stop)
datetimes = self.finite_datetimes(finite_start, finite_stop) if finite_start <= finite_stop else []
if datetimes:
logger.debug('Actually checking if range %s of %s is complete',
self._format_range(datetimes), self.of_cls.task_family)
missing_datetimes = sorted(self._missing_datetimes(datetimes))
logger.debug('Range %s lacked %d of expected %d %s instances',
self._format_range(datetimes), len(missing_datetimes), len(datetimes), self.of_cls.task_family)
else:
missing_datetimes = []
logger.debug('Empty range. No %s instances expected', self.of_cls.task_family)
self._emit_metrics(missing_datetimes, finite_start, finite_stop)
if self.reverse:
required_datetimes = missing_datetimes[-self.task_limit:]
else:
required_datetimes = missing_datetimes[:self.task_limit]
if required_datetimes:
logger.debug('Requiring %d missing %s instances in range %s',
len(required_datetimes), self.of_cls.task_family, self._format_range(required_datetimes))
if self.reverse:
required_datetimes.reverse() # TODO priorities, so that within the batch tasks are ordered too
self._cached_requires = [self._instantiate_task_cls(self.datetime_to_parameter(d)) for d in required_datetimes]
return self._cached_requires
def missing_datetimes(self, finite_datetimes):
"""
Override in subclasses to do bulk checks.
Returns a sorted list.
This is a conservative base implementation that brutally checks completeness, instance by instance.
Inadvisable as it may be slow.
"""
return [d for d in finite_datetimes if not self._instantiate_task_cls(self.datetime_to_parameter(d)).complete()]
def _missing_datetimes(self, finite_datetimes):
"""
Backward compatible wrapper. Will be deleted eventually (stated on Dec 2015)
"""
try:
return self.missing_datetimes(finite_datetimes)
except TypeError as ex:
if 'missing_datetimes()' in repr(ex):
warnings.warn('In your Range* subclass, missing_datetimes() should only take 1 argument (see latest docs)')
return self.missing_datetimes(self.of_cls, finite_datetimes)
else:
raise
class RangeDailyBase(RangeBase):
"""
Produces a contiguous completed range of a daily recurring task.
"""
start = luigi.DateParameter(
default=None,
description="beginning date, inclusive. Default: None - work backward forever (requires reverse=True)")
stop = luigi.DateParameter(
default=None,
description="ending date, exclusive. Default: None - work forward forever")
days_back = luigi.IntParameter(
default=100, # slightly more than three months
description=("extent to which contiguousness is to be assured into "
"past, in days from current time. Prevents infinite loop "
"when start is none. If the dataset has limited retention"
" (i.e. old outputs get removed), this should be set "
"shorter to that, too, to prevent the oldest outputs "
"flapping. Increase freely if you intend to process old "
"dates - worker's memory is the limit"))
days_forward = luigi.IntParameter(
default=0,
description="extent to which contiguousness is to be assured into future, in days from current time. Prevents infinite loop when stop is none")
def datetime_to_parameter(self, dt):
return dt.date()
def parameter_to_datetime(self, p):
return datetime(p.year, p.month, p.day)
def datetime_to_parameters(self, dt):
"""
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
"""
return self._task_parameters(dt.date())
def parameters_to_datetime(self, p):
"""
Given a dictionary of parameters, will extract the ranged task parameter value
"""
dt = p[self._param_name]
return datetime(dt.year, dt.month, dt.day)
def moving_start(self, now):
return now - timedelta(days=self.days_back)
def moving_stop(self, now):
return now + timedelta(days=self.days_forward)
def finite_datetimes(self, finite_start, finite_stop):
"""
Simply returns the points in time that correspond to turn of day.
"""
date_start = datetime(finite_start.year, finite_start.month, finite_start.day)
dates = []
for i in itertools.count():
t = date_start + timedelta(days=i)
if t >= finite_stop:
return dates
if t >= finite_start:
dates.append(t)
class RangeHourlyBase(RangeBase):
"""
Produces a contiguous completed range of an hourly recurring task.
"""
start = luigi.DateHourParameter(
default=None,
description="beginning datehour, inclusive. Default: None - work backward forever (requires reverse=True)")
stop = luigi.DateHourParameter(
default=None,
description="ending datehour, exclusive. Default: None - work forward forever")
hours_back = luigi.IntParameter(
default=100 * 24, # slightly more than three months
description=("extent to which contiguousness is to be assured into "
"past, in hours from current time. Prevents infinite "
"loop when start is none. If the dataset has limited "
"retention (i.e. old outputs get removed), this should "
"be set shorter to that, too, to prevent the oldest "
"outputs flapping. Increase freely if you intend to "
"process old dates - worker's memory is the limit"))
# TODO always entire interval for reprocessings (fixed start and stop)?
hours_forward = luigi.IntParameter(
default=0,
description="extent to which contiguousness is to be assured into future, in hours from current time. Prevents infinite loop when stop is none")
def datetime_to_parameter(self, dt):
return dt
def parameter_to_datetime(self, p):
return p
def datetime_to_parameters(self, dt):
"""
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
"""
return self._task_parameters(dt)
def parameters_to_datetime(self, p):
"""
Given a dictionary of parameters, will extract the ranged task parameter value
"""
return p[self._param_name]
def moving_start(self, now):
return now - timedelta(hours=self.hours_back)
def moving_stop(self, now):
return now + timedelta(hours=self.hours_forward)
def finite_datetimes(self, finite_start, finite_stop):
"""
Simply returns the points in time that correspond to whole hours.
"""
datehour_start = datetime(finite_start.year, finite_start.month, finite_start.day, finite_start.hour)
datehours = []
for i in itertools.count():
t = datehour_start + timedelta(hours=i)
if t >= finite_stop:
return datehours
if t >= finite_start:
datehours.append(t)
def _format_datetime(self, dt):
return luigi.DateHourParameter().serialize(dt)
class RangeByMinutesBase(RangeBase):
"""
Produces a contiguous completed range of an recurring tasks separated a specified number of minutes.
"""
start = luigi.DateMinuteParameter(
default=None,
description="beginning date-hour-minute, inclusive. Default: None - work backward forever (requires reverse=True)")
stop = luigi.DateMinuteParameter(
default=None,
description="ending date-hour-minute, exclusive. Default: None - work forward forever")
minutes_back = luigi.IntParameter(
default=60*24, # one day
description=("extent to which contiguousness is to be assured into "
"past, in minutes from current time. Prevents infinite "
"loop when start is none. If the dataset has limited "
"retention (i.e. old outputs get removed), this should "
"be set shorter to that, too, to prevent the oldest "
"outputs flapping. Increase freely if you intend to "
"process old dates - worker's memory is the limit"))
minutes_forward = luigi.IntParameter(
default=0,
description="extent to which contiguousness is to be assured into future, "
"in minutes from current time. Prevents infinite loop when stop is none")
minutes_interval = luigi.IntParameter(
default=1,
description="separation between events in minutes. It must evenly divide 60"
)
def datetime_to_parameter(self, dt):
return dt
def parameter_to_datetime(self, p):
return p
def datetime_to_parameters(self, dt):
"""
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
"""
return self._task_parameters(dt)
def parameters_to_datetime(self, p):
"""
Given a dictionary of parameters, will extract the ranged task parameter value
"""
dt = p[self._param_name]
return datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute)
def moving_start(self, now):
return now - timedelta(minutes=self.minutes_back)
def moving_stop(self, now):
return now + timedelta(minutes=self.minutes_forward)
def finite_datetimes(self, finite_start, finite_stop):
"""
Simply returns the points in time that correspond to a whole number of minutes intervals.
"""
# Validate that the minutes_interval can divide 60 and it is greater than 0 and lesser than 60
if not (0 < self.minutes_interval < 60):
raise ParameterException('minutes-interval must be within 0..60')
if 60 % self.minutes_interval != 0:
raise ParameterException('minutes-interval does not evenly divide 60')
# start of a complete interval, e.g. 20:13 and the interval is 5 -> 20:10
start_minute = int(finite_start.minute/self.minutes_interval)*self.minutes_interval
datehour_start = datetime(
year=finite_start.year,
month=finite_start.month,
day=finite_start.day,
hour=finite_start.hour,
minute=start_minute)
datehours = []
for i in itertools.count():
t = datehour_start + timedelta(minutes=i*self.minutes_interval)
if t >= finite_stop:
return datehours
if t >= finite_start:
datehours.append(t)
def _format_datetime(self, dt):
return luigi.DateMinuteParameter().serialize(dt)
def _constrain_glob(glob, paths, limit=5):
"""
Tweaks glob into a list of more specific globs that together still cover paths and not too much extra.
Saves us minutes long listings for long dataset histories.
Specifically, in this implementation the leftmost occurrences of "[0-9]"
give rise to a few separate globs that each specialize the expression to
digits that actually occur in paths.
"""
def digit_set_wildcard(chars):
"""
Makes a wildcard expression for the set, a bit readable, e.g. [1-5].
"""
chars = sorted(chars)
if len(chars) > 1 and ord(chars[-1]) - ord(chars[0]) == len(chars) - 1:
return '[%s-%s]' % (chars[0], chars[-1])
else:
return '[%s]' % ''.join(chars)
current = {glob: paths}
while True:
pos = list(current.keys())[0].find('[0-9]')
if pos == -1:
# no wildcard expressions left to specialize in the glob
return list(current.keys())
char_sets = {}
for g, p in current.items():
char_sets[g] = sorted({path[pos] for path in p})
if sum(len(s) for s in char_sets.values()) > limit:
return [g.replace('[0-9]', digit_set_wildcard(char_sets[g]), 1) for g in current]
for g, s in char_sets.items():
for c in s:
new_glob = g.replace('[0-9]', c, 1)
new_paths = list(filter(lambda p: p[pos] == c, current[g]))
current[new_glob] = new_paths
del current[g]
def most_common(items):
[(element, counter)] = Counter(items).most_common(1)
return element, counter
def _get_per_location_glob(tasks, outputs, regexes):
"""
Builds a glob listing existing output paths.
Esoteric reverse engineering, but worth it given that (compared to an
equivalent contiguousness guarantee by naive complete() checks)
requests to the filesystem are cut by orders of magnitude, and users
don't even have to retrofit existing tasks anyhow.
"""
paths = [o.path for o in outputs]
# naive, because some matches could be confused by numbers earlier
# in path, e.g. /foo/fifa2000k/bar/2000-12-31/00
matches = [r.search(p) for r, p in zip(regexes, paths)]
for m, p, t in zip(matches, paths, tasks):
if m is None:
raise NotImplementedError("Couldn't deduce datehour representation in output path %r of task %s" % (p, t))
n_groups = len(matches[0].groups())
# the most common position of every group is likely
# to be conclusive hit or miss
positions = [most_common((m.start(i), m.end(i)) for m in matches)[0] for i in range(1, n_groups + 1)]
glob = list(paths[0]) # FIXME sanity check that it's the same for all paths
for start, end in positions:
glob = glob[:start] + ['[0-9]'] * (end - start) + glob[end:]
# chop off the last path item
# (wouldn't need to if `hadoop fs -ls -d` equivalent were available)
return ''.join(glob).rsplit('/', 1)[0]
def _get_filesystems_and_globs(datetime_to_task, datetime_to_re):
"""
Yields a (filesystem, glob) tuple per every output location of task.
The task can have one or several FileSystemTarget outputs.
For convenience, the task can be a luigi.WrapperTask,
in which case outputs of all its dependencies are considered.
"""
# probe some scattered datetimes unlikely to all occur in paths, other than by being sincere datetime parameter's representations
# TODO limit to [self.start, self.stop) so messages are less confusing? Done trivially it can kill correctness
sample_datetimes = [datetime(y, m, d, h) for y in range(2000, 2050, 10) for m in range(1, 4) for d in range(5, 8) for h in range(21, 24)]
regexes = [re.compile(datetime_to_re(d)) for d in sample_datetimes]
sample_tasks = [datetime_to_task(d) for d in sample_datetimes]
sample_outputs = [flatten_output(t) for t in sample_tasks]
for o, t in zip(sample_outputs, sample_tasks):
if len(o) != len(sample_outputs[0]):
raise NotImplementedError("Outputs must be consistent over time, sorry; was %r for %r and %r for %r" % (o, t, sample_outputs[0], sample_tasks[0]))
# TODO fall back on requiring last couple of days? to avoid astonishing blocking when changes like that are deployed
# erm, actually it's not hard to test entire hours_back..hours_forward and split into consistent subranges FIXME?
for target in o:
if not isinstance(target, FileSystemTarget):
raise NotImplementedError("Output targets must be instances of FileSystemTarget; was %r for %r" % (target, t))
for o in zip(*sample_outputs): # transposed, so here we're iterating over logical outputs, not datetimes
glob = _get_per_location_glob(sample_tasks, o, regexes)
yield o[0].fs, glob
def _list_existing(filesystem, glob, paths):
"""
Get all the paths that do in fact exist. Returns a set of all existing paths.
Takes a luigi.target.FileSystem object, a str which represents a glob and
a list of strings representing paths.
"""
globs = _constrain_glob(glob, paths)
time_start = time.time()
listing = []
for g in sorted(globs):
logger.debug('Listing %s', g)
if filesystem.exists(g):
listing.extend(filesystem.listdir(g))
logger.debug('%d %s listings took %f s to return %d items',
len(globs), filesystem.__class__.__name__, time.time() - time_start, len(listing))
return set(listing)
def infer_bulk_complete_from_fs(datetimes, datetime_to_task, datetime_to_re):
"""
Efficiently determines missing datetimes by filesystem listing.
The current implementation works for the common case of a task writing
output to a ``FileSystemTarget`` whose path is built using strftime with
format like '...%Y...%m...%d...%H...', without custom ``complete()`` or
``exists()``.
(Eventually Luigi could have ranges of completion as first-class citizens.
Then this listing business could be factored away/be provided for
explicitly in target API or some kind of a history server.)
"""
filesystems_and_globs_by_location = _get_filesystems_and_globs(datetime_to_task, datetime_to_re)
paths_by_datetime = [[o.path for o in flatten_output(datetime_to_task(d))] for d in datetimes]
listing = set()
for (f, g), p in zip(filesystems_and_globs_by_location, zip(*paths_by_datetime)): # transposed, so here we're iterating over logical outputs, not datetimes
listing |= _list_existing(f, g, p)
# quickly learn everything that's missing
missing_datetimes = []
for d, p in zip(datetimes, paths_by_datetime):
if not set(p) <= listing:
missing_datetimes.append(d)
return missing_datetimes
class RangeMonthly(RangeBase):
"""
Produces a contiguous completed range of a monthly recurring task.
Unlike the Range* classes with shorter intervals, this class does not perform bulk optimisation.
It is assumed that the number of months is low enough not to motivate the increased complexity.
Hence, there is no class RangeMonthlyBase.
"""
start = luigi.MonthParameter(
default=None,
description="beginning month, inclusive. Default: None - work backward forever (requires reverse=True)")
stop = luigi.MonthParameter(
default=None,
description="ending month, exclusive. Default: None - work forward forever")
months_back = luigi.IntParameter(
default=13, # Little over a year
description=("extent to which contiguousness is to be assured into "
"past, in months from current time. Prevents infinite loop "
"when start is none. If the dataset has limited retention"
" (i.e. old outputs get removed), this should be set "
"shorter to that, too, to prevent the oldest outputs "
"flapping. Increase freely if you intend to process old "
"dates - worker's memory is the limit"))
months_forward = luigi.IntParameter(
default=0,
description="extent to which contiguousness is to be assured into future, in months from current time. "
"Prevents infinite loop when stop is none")
def datetime_to_parameter(self, dt):
return date(dt.year, dt.month, 1)
def parameter_to_datetime(self, p):
return datetime(p.year, p.month, 1)
def datetime_to_parameters(self, dt):
"""
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
"""
return self._task_parameters(dt.date())
def parameters_to_datetime(self, p):
"""
Given a dictionary of parameters, will extract the ranged task parameter value
"""
dt = p[self._param_name]
return datetime(dt.year, dt.month, 1)
def _format_datetime(self, dt):
return dt.strftime('%Y-%m')
def moving_start(self, now):
return self._align(now) - relativedelta(months=self.months_back)
def moving_stop(self, now):
return self._align(now) + relativedelta(months=self.months_forward)
def _align(self, dt):
return datetime(dt.year, dt.month, 1)
def finite_datetimes(self, finite_start, finite_stop):
"""
Simply returns the points in time that correspond to turn of month.
"""
start_date = self._align(finite_start)
aligned_stop = self._align(finite_stop)
dates = []
for m in itertools.count():
t = start_date + relativedelta(months=m)
if t >= aligned_stop:
return dates
if t >= finite_start:
dates.append(t)
class RangeDaily(RangeDailyBase):
"""Efficiently produces a contiguous completed range of a daily recurring
task that takes a single ``DateParameter``.
Falls back to infer it from output filesystem listing to facilitate the
common case usage.
Convenient to use even from command line, like:
.. code-block:: console
luigi --module your.module RangeDaily --of YourActualTask --start 2014-01-01
"""
def missing_datetimes(self, finite_datetimes):
try:
cls_with_params = functools.partial(self.of, **self.of_params)
complete_parameters = self.of.bulk_complete.__func__(cls_with_params, map(self.datetime_to_parameter, finite_datetimes))
return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters))
except NotImplementedError:
return infer_bulk_complete_from_fs(
finite_datetimes,
lambda d: self._instantiate_task_cls(self.datetime_to_parameter(d)),
lambda d: d.strftime('(%Y).*(%m).*(%d)'))
class RangeHourly(RangeHourlyBase):
"""Efficiently produces a contiguous completed range of an hourly recurring
task that takes a single ``DateHourParameter``.
Benefits from ``bulk_complete`` information to efficiently cover gaps.
Falls back to infer it from output filesystem listing to facilitate the
common case usage.
Convenient to use even from command line, like:
.. code-block:: console
luigi --module your.module RangeHourly --of YourActualTask --start 2014-01-01T00
"""
def missing_datetimes(self, finite_datetimes):
try:
# TODO: Why is there a list() here but not for the RangeDaily??
cls_with_params = functools.partial(self.of, **self.of_params)
complete_parameters = self.of.bulk_complete.__func__(cls_with_params, list(map(self.datetime_to_parameter, finite_datetimes)))
return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters))
except NotImplementedError:
return infer_bulk_complete_from_fs(
finite_datetimes,
lambda d: self._instantiate_task_cls(self.datetime_to_parameter(d)),
lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H)'))
class RangeByMinutes(RangeByMinutesBase):
"""Efficiently produces a contiguous completed range of an recurring
task every interval minutes that takes a single ``DateMinuteParameter``.
Benefits from ``bulk_complete`` information to efficiently cover gaps.
Falls back to infer it from output filesystem listing to facilitate the
common case usage.
Convenient to use even from command line, like:
.. code-block:: console
luigi --module your.module RangeByMinutes --of YourActualTask --start 2014-01-01T0123
"""
def missing_datetimes(self, finite_datetimes):
try:
cls_with_params = functools.partial(self.of, **self.of_params)
complete_parameters = self.of.bulk_complete.__func__(cls_with_params, map(self.datetime_to_parameter, finite_datetimes))
return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters))
except NotImplementedError:
return infer_bulk_complete_from_fs(
finite_datetimes,
lambda d: self._instantiate_task_cls(self.datetime_to_parameter(d)),
lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H).*(%M)'))