Skip to content

Commit

Permalink
Replace Celery by RQ for async job queue #6 (#56)
Browse files Browse the repository at this point in the history
Signed-off-by: tdruez <tdruez@nexb.com>
  • Loading branch information
tdruez authored Feb 23, 2024
1 parent 7d4eb38 commit 48144d1
Show file tree
Hide file tree
Showing 34 changed files with 148 additions and 275 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
*.pyc
*.db
*.rdb
.installed.cfg
parts
develop-eggs
Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
Release notes
=============

### Version 5.0.2-dev
### Version 5.1.0-dev

- Replace Celery by RQ for async job queue and worker.
https://github.com/nexB/dejacode/issues/6

- Lookup in PurlDB by purl in Add Package form.
When a Package URL is available in the context of the "Add Package" form,
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ postgresdb:
@createdb --owner=${DB_USERNAME} ${POSTGRES_INITDB_ARGS} ${DB_NAME}

run:
${MANAGE} runserver 8000
${MANAGE} runserver 8000 --insecure

worker:
${MANAGE} rqworker

test:
@echo "-> Run the test suite"
Expand Down
2 changes: 1 addition & 1 deletion component_catalog/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from dje.models import ParentChildModelMixin
from dje.models import ParentChildRelationshipModel
from dje.models import ReferenceNotesMixin
from dje.tasks import tasks_logger
from dje.tasks import logger as tasks_logger
from dje.utils import set_fields_from_object
from dje.validators import generic_uri_validator
from dje.validators import validate_url_segment
Expand Down
5 changes: 1 addition & 4 deletions dejacode/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@
import os
import sys

from dejacode.celery import app as celery_app

VERSION = "5.0.2-dev"
VERSION = "5.1.0-dev"
__version__ = VERSION
__all__ = ["celery_app"]


def command_line():
Expand Down
23 changes: 0 additions & 23 deletions dejacode/celery.py

This file was deleted.

48 changes: 40 additions & 8 deletions dejacode/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def gettext_noop(s):
"django.contrib.admin",
"rest_framework",
"rest_framework.authtoken",
"django_rq",
"crispy_forms",
"crispy_bootstrap5",
"guardian",
Expand Down Expand Up @@ -400,8 +401,7 @@ def gettext_noop(s):

# Default setup for the cache
# See https://docs.djangoproject.com/en/dev/topics/cache/
# Set CACHE_BACKEND="django.core.cache.backends.locmem.LocMemCache" in dev mode
CACHE_BACKEND = env.str("CACHE_BACKEND", default="django.core.cache.backends.redis.RedisCache")
CACHE_BACKEND = env.str("CACHE_BACKEND", default="django.core.cache.backends.locmem.LocMemCache")
CACHES = {
"default": {
"BACKEND": CACHE_BACKEND,
Expand All @@ -416,6 +416,44 @@ def gettext_noop(s):
},
}

# Job Queue
RQ_QUEUES = {
"default": {
"HOST": env.str("DEJACODE_REDIS_HOST", default="localhost"),
"PORT": env.str("DEJACODE_REDIS_PORT", default="6379"),
"PASSWORD": env.str("DEJACODE_REDIS_PASSWORD", default=""),
"DEFAULT_TIMEOUT": env.int("DEJACODE_REDIS_DEFAULT_TIMEOUT", default=360),
},
}


def enable_rq_eager_mode():
"""
Enable the eager mode for the RQ tasks system.
Meaning the tasks will run directly sychroniously without the need of a worker.
Setting ASYNC to False in RQ_QUEUES will run jobs synchronously, but a running
Redis server is still needed to store job data.
This function patch the django_rq.get_redis_connection to always return a fake
redis connection using the `fakeredis` library.
"""
import django_rq.queues
from fakeredis import FakeRedis
from fakeredis import FakeStrictRedis

for queue_config in RQ_QUEUES.values():
queue_config["ASYNC"] = False

def get_fake_redis_connection(config, use_strict_redis):
return FakeStrictRedis() if use_strict_redis else FakeRedis()

django_rq.queues.get_redis_connection = get_fake_redis_connection


DEJACODE_ASYNC = env.bool("DEJACODE_ASYNC", default=False)
if not DEJACODE_ASYNC or IS_TESTS:
enable_rq_eager_mode()


# https://docs.djangoproject.com/en/dev/topics/logging/#configuring-logging
LOGGING = {
"version": 1,
Expand Down Expand Up @@ -597,10 +635,6 @@ def gettext_noop(s):
# authentication.
AXES_DISABLE_ACCESS_LOG = True

# Celery
CELERY_BROKER_URL = env.str("CELERY_BROKER_URL", default="redis://")
CELERY_TASK_ALWAYS_EAGER = env.bool("CELERY_TASK_ALWAYS_EAGER", default=False)

# 2FA with django-otp
OTP_TOTP_ISSUER = "DejaCode"

Expand All @@ -618,8 +652,6 @@ def gettext_noop(s):
INTERNAL_IPS = ["127.0.0.1"]

if IS_TESTS:
CELERY_TASK_ALWAYS_EAGER = True
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
# Silent the django-axes logging during tests
LOGGING["loggers"].update({"axes": {"handlers": ["null"]}})
# Set a faster hashing algorithm for running the tests
Expand Down
42 changes: 21 additions & 21 deletions dje/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# See https://aboutcode.org for more information about AboutCode FOSS projects.
#

import logging
from datetime import datetime
from io import StringIO

Expand All @@ -18,16 +19,15 @@
from django.db import transaction
from django.template.defaultfilters import pluralize

from celery import shared_task
from celery.utils.log import get_task_logger
from django_rq import job

from dejacode_toolkit.scancodeio import ScanCodeIO
from dje.utils import is_available

tasks_logger = get_task_logger(__name__)
logger = logging.getLogger(__name__)


@shared_task
@job
def send_mail_task(subject, message, from_email, recipient_list, fail_silently=True):
"""
Send an email as an asynchronous task.
Expand All @@ -41,7 +41,7 @@ def send_mail_task(subject, message, from_email, recipient_list, fail_silently=T
send_mail(subject, message, from_email, recipient_list, fail_silently)


@shared_task
@job
def send_mail_to_admins_task(subject, message, from_email=None, fail_silently=True):
"""Send an email to system administrators as an asynchronous task."""
if not from_email:
Expand All @@ -54,10 +54,10 @@ def send_mail_to_admins_task(subject, message, from_email=None, fail_silently=Tr
send_mail_task(subject, message, from_email, recipient_list, fail_silently)


@shared_task(time_limit=7200)
@job("default", timeout=7200)
def call_management_command(name, *args, **options):
"""Run a management command as an asynchronous task."""
tasks_logger.info(
logger.info(
f"Entering call_management_command task with name={name} args={args} options={options}"
)

Expand All @@ -76,25 +76,25 @@ def call_management_command(name, *args, **options):
user.email_user(subject, msg)


@shared_task
@job
def package_collect_data(instance_id):
"""Run Package.collect_data() as an asynchronous task."""
Package = apps.get_model("component_catalog", "package")
tasks_logger.info(f"Entering package_collect_data task for Package.id={instance_id}")
logger.info(f"Entering package_collect_data task for Package.id={instance_id}")
package = Package.objects.get(id=instance_id)
tasks_logger.info(f"Package Download URL: {package.download_url}")
logger.info(f"Package Download URL: {package.download_url}")
package.collect_data()


@shared_task
@job
def scancodeio_submit_scan(uris, user_uuid, dataspace_uuid):
"""
Submit the provided `uris` to ScanCode.io as an asynchronous task.
Only publicly available URLs are sent to ScanCode.io.
"""
from dje.models import DejacodeUser

tasks_logger.info(
logger.info(
f"Entering scancodeio_submit_scan task with "
f"uris={uris} user_uuid={user_uuid} dataspace_uuid={dataspace_uuid}"
)
Expand All @@ -111,18 +111,18 @@ def scancodeio_submit_scan(uris, user_uuid, dataspace_uuid):
if is_available(uri):
ScanCodeIO(user).submit_scan(uri, user_uuid, dataspace_uuid)
else:
tasks_logger.info(f'uri="{uri}" is not reachable.')
logger.info(f'uri="{uri}" is not reachable.')


@shared_task
@job
def scancodeio_submit_manifest_inspection(scancodeproject_uuid, user_uuid):
"""
Submit the provided `uris` to ScanCode.io as an asynchronous task.
Only publicly available URLs are sent to ScanCode.io.
"""
from dje.models import DejacodeUser

tasks_logger.info(
logger.info(
f"Entering scancodeio_submit_manifest_inspection task with "
f"scancodeproject_uuid={scancodeproject_uuid} user_uuid={user_uuid}"
)
Expand All @@ -148,13 +148,13 @@ def scancodeio_submit_manifest_inspection(scancodeproject_uuid, user_uuid):
)

if not response:
tasks_logger.info("Error submitting the manifest to ScanCode.io server")
logger.info("Error submitting the manifest to ScanCode.io server")
scancode_project.status = ScanCodeProject.Status.FAILURE
msg = "- Error: Manifest could not be submitted to ScanCode.io"
scancode_project.append_to_log(msg, save=True)
return

tasks_logger.info("Update the ScanCodeProject instance")
logger.info("Update the ScanCodeProject instance")
scancode_project.status = ScanCodeProject.Status.SUBMITTED
scancode_project.project_uuid = response.get("uuid")
msg = "- Manifest submitted to ScanCode.io for inspection"
Expand All @@ -163,17 +163,17 @@ def scancodeio_submit_manifest_inspection(scancodeproject_uuid, user_uuid):
# Delay the execution of the pipeline after the ScancodeProject instance was
# properly saved and committed in order to avoid any race conditions.
if runs := response.get("runs"):
tasks_logger.info("Start the pipeline run")
logger.info("Start the pipeline run")
transaction.on_commit(lambda: scancodeio.start_pipeline(run_url=runs[0]["url"]))


@shared_task
@job
def pull_project_data_from_scancodeio(scancodeproject_uuid):
"""
Pull Project data from ScanCode.io as an asynchronous task for the provided
`scancodeproject_uuid`.
"""
tasks_logger.info(
logger.info(
f"Entering pull_project_data_from_scancodeio task with "
f"scancodeproject_uuid={scancodeproject_uuid}"
)
Expand All @@ -184,7 +184,7 @@ def pull_project_data_from_scancodeio(scancodeproject_uuid):
# Make sure the import is not already in progress,
# or that the import has not completed yet.
if not scancode_project.can_start_import:
tasks_logger.error("Cannot start import")
logger.error("Cannot start import")
return

# Update the status to prevent from starting the task again
Expand Down
2 changes: 1 addition & 1 deletion dje/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,7 @@ def object_compare_view(request):

@login_required
def clone_dataset_view(request, pk):
"""Call the clonedataset management command as a celery task."""
"""Call the clonedataset management command as a an async task."""
changelist_url = reverse("admin:dje_dataspace_changelist")
user = request.user
template_dataspace = settings.TEMPLATE_DATASPACE
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ services:
build: .
# Ensure that potential db migrations run first by waiting until "web" is up
command: wait-for-it --strict --timeout=180 web:8000 -- sh -c "
celery worker --app=dejacode --concurrency=2 --time-limit=300 --loglevel=INFO"
./manage.py rqworker --verbosity 1"
env_file:
- docker.env
volumes:
Expand Down
4 changes: 3 additions & 1 deletion docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ DATABASE_NAME=dejacode_db
DATABASE_USER=dejacode
DATABASE_PASSWORD=dejacode

DEJACODE_REDIS_HOST=redis
DEJACODE_ASYNC=True

STATIC_ROOT=/var/dejacode/static/
MEDIA_ROOT=/var/dejacode/media/
REDIS_URL=redis://redis:6379
CELERY_BROKER_URL=redis://redis:6379
CACHE_BACKEND=django.core.cache.backends.redis.RedisCache
CLAMD_ENABLED=True
CLAMD_TCP_ADDR=clamav
Loading

0 comments on commit 48144d1

Please sign in to comment.