Skip to content

Commit

Permalink
Add ability to ship periodic RQ jobs as part of extensions again. (#4822
Browse files Browse the repository at this point in the history
)

This was dropped in aa17681.
  • Loading branch information
jezdez authored Apr 28, 2020
1 parent bb767f3 commit 2c90d92
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 25 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ venv/
coverage.xml
client/dist
.DS_Store
celerybeat-schedule*
.#*
\#*#
*~
Expand Down
28 changes: 14 additions & 14 deletions redash/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
# The global Redash extension registry
extensions = odict()

# The periodic Celery tasks as provided by Redash extensions.
# This is separate from the internal periodic Celery tasks in
# celery_schedule since the extension task discovery phase is
# The periodic RQ jobs as provided by Redash extensions.
# This is separate from the internal periodic RQ jobs
# since the extension job discovery phase is
# after the configuration has already happened.
periodic_tasks = odict()
periodic_jobs = odict()

extension_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,38 +69,38 @@ def extension(app):
entry_point_loader("redash.extensions", extensions, logger=app.logger, app=app)


def load_periodic_tasks(logger=None):
"""Load the periodic tasks as defined in Redash extensions.
def load_periodic_jobs(logger=None):
"""Load the periodic jobs as defined in Redash extensions.
The periodic task entry point needs to return a set of parameters
that can be passed to Celery's add_periodic_task:
that can be passed to RQ Scheduler API:
https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries
https://github.com/rq/rq-scheduler#periodic--repeated-jobs
E.g.::
def add_two_and_two():
return {
'name': 'add 2 and 2 every 10 seconds'
'sig': add.s(2, 2),
'schedule': 10.0, # in seconds or a timedelta
"func": add,
"args": [2, 2]
"interval": 10, # in seconds or as a timedelta
}
and then registered with an entry point under the "redash.periodic_tasks"
and then registered with an entry point under the "redash.periodic_jobs"
group, e.g. in your setup.py::
setup(
# ...
entry_points={
"redash.periodic_tasks": [
"redash.periodic_jobs": [
"add_two_and_two = calculus.addition:add_two_and_two",
]
# ...
},
# ...
)
"""
entry_point_loader("redash.periodic_tasks", periodic_tasks, logger=logger)
entry_point_loader("redash.periodic_jobs", periodic_jobs, logger=logger)


def init_app(app):
Expand Down
8 changes: 5 additions & 3 deletions redash/tasks/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
import hashlib
import json
from datetime import datetime, timedelta
from functools import partial
from random import randint

from rq.job import Job
from rq_scheduler import Scheduler

from redash import settings, rq_redis_connection
from redash import extensions, settings, rq_redis_connection
from redash.tasks import (
sync_user_details,
refresh_queries,
Expand Down Expand Up @@ -79,6 +77,10 @@ def periodic_job_definitions():
# Add your own custom periodic jobs in your dynamic_settings module.
jobs.extend(settings.dynamic_settings.periodic_jobs() or [])

# Add periodic jobs that are shipped as part of Redash extensions
extensions.load_periodic_jobs(logger)
jobs.extend(list(extensions.periodic_jobs.values()))

return jobs


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ not_findable_extension = redash_dummy:missing_attribute
not_importable_extension = missing_extension_module:extension
working_extension = redash_dummy:extension

[redash.periodic_tasks]
dummy_periodic_task = redash_dummy:periodic_task
[redash.periodic_jobs]
dummy_periodic_job = redash_dummy:periodic_job

15 changes: 13 additions & 2 deletions tests/extensions/redash-dummy/redash_dummy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta

module_attribute = "hello!"


Expand All @@ -11,5 +13,14 @@ def assertive_extension(app):
assert False


def periodic_task(*args, **kwargs):
"""This periodic task will successfully load"""
def job_callback():
return "result"


def periodic_job(*args, **kwargs):
"""This periodic job will successfully load"""
return {
"func": job_callback,
"timeout": 60,
"interval": timedelta(minutes=1),
}
2 changes: 1 addition & 1 deletion tests/extensions/redash-dummy/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"not_importable_extension = missing_extension_module:extension",
"assertive_extension = redash_dummy:assertive_extension",
],
"redash.periodic_tasks": ["dummy_periodic_task = redash_dummy:periodic_task"],
"redash.periodic_jobs": ["dummy_periodic_job = redash_dummy:periodic_job"],
},
py_modules=["redash_dummy"],
)
10 changes: 8 additions & 2 deletions tests/extensions/test_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys

from redash import extensions
from redash.tasks import periodic_job_definitions
from tests import BaseTestCase

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -41,5 +42,10 @@ def test_dummy_periodic_task(self):
# need to load the periodic tasks manually since this isn't
# done automatically on test suite start but only part of
# the worker configuration
extensions.load_periodic_tasks(logger)
self.assertIn("dummy_periodic_task", extensions.periodic_tasks.keys())
extensions.load_periodic_jobs(logger)
self.assertIn("dummy_periodic_job", extensions.periodic_jobs.keys())

def test_dummy_periodic_task_definitions(self):
jobs = periodic_job_definitions()
from redash_dummy import job_callback
self.assertIn(job_callback, [job.get("func", None) for job in jobs])

0 comments on commit 2c90d92

Please sign in to comment.