-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.py
59 lines (38 loc) · 1.22 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import logging
from datetime import datetime
import dramatiq
from dramatiq.brokers.stub import StubBroker
from periodiq import cron, PeriodiqMiddleware
broker = StubBroker()
broker.add_middleware(PeriodiqMiddleware())
dramatiq.set_broker(broker)
logger = logging.getLogger(__name__)
now = datetime.now()
@dramatiq.actor(periodic=cron('* * * * *'))
def minutely():
logger.info("Minutely.")
@dramatiq.actor(periodic=cron('*/15 * * * *'))
def quarthourly():
logger.info("Quart-hourly.")
@dramatiq.actor(periodic=cron('@hourly'))
def hourly():
logger.info("Hourly.")
@dramatiq.actor(periodic=cron('1 2 * * *'))
def dst():
logger.info("Skipped on daylight saving time change in Europe/Paris.")
# For testing purpose, schedule daily in current hour.
@dramatiq.actor(periodic=cron('58 {} * * *'.format(now.hour)))
def daily():
logger.info("Daily.")
@dramatiq.actor(periodic=cron('30 10 * * Sun'))
def weekly():
logger.info("Ding dong ding dong!")
@dramatiq.actor(periodic=cron('0 18 1 * *'))
def monthly():
logger.info("Income day.")
@dramatiq.actor(periodic=cron('0 0 25 12 *'))
def yearly():
logger.info("Merry Chrismas!")
@dramatiq.actor()
def notperiodic():
raise Exception("Must not be schedule.")