Skip to content

Commit

Permalink
Initial dispatcher library migration, untested
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanCoding committed Feb 27, 2025
1 parent 7b8b37d commit 8bdbfcc
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 26 deletions.
26 changes: 26 additions & 0 deletions awx/main/apps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

from dispatcher.config import setup as dispatcher_setup

from django.apps import AppConfig
from django.utils.translation import gettext_lazy as _
from awx.main.utils.common import bypass_in_test, load_all_entry_points_for
Expand Down Expand Up @@ -79,6 +81,30 @@ def load_inventory_plugins(self):
def ready(self):
super().ready()

from django.conf import settings
from awx.main.utils.db import get_pg_notify_params
from awx.main.dispatch import get_task_queuename

dispatcher_setup(
{
"version": 2,
"service": {"pool_kwargs": {"max_workers": 4}, "main_kwargs": {"node_id": settings.CLUSTER_HOST_ID}},
"brokers": {
"pg_notify": {
"config": get_pg_notify_params(),
"sync_connection_factory": "awx.main.utils.db.psycopg_connection_from_django",
"channels": ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()],
# "default_publish_channel": "tower_broadcast_all"
}
},
"producers": {
"ScheduledProducer": {"task_schedule": settings.DISPATCHER_SCHEDULE},
"OnStartProducer": {"task_list": {"awx.main.tasks.system.dispatch_startup": {}}},
},
"publish": {"default_broker": "pg_notify"},
}
)

"""
Credential loading triggers database operations. There are cases we want to call
awx-manage collectstatic without a database. All management commands invoke the ready() code
Expand Down
84 changes: 59 additions & 25 deletions awx/main/management/commands/run_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
from django.conf import settings
from django.core.management.base import BaseCommand

from flags.state import flag_enabled

from dispatcher.factories import get_control_from_settings
from dispatcher import run_service

from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.control import Control
from awx.main.dispatch.pool import AutoscalePool
Expand Down Expand Up @@ -40,36 +45,65 @@ def add_arguments(self, parser):

def handle(self, *arg, **options):
if options.get('status'):
print(Control('dispatcher').status())
return
if flag_enabled('FEATURE_NEW_DISPATCHER'):
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('worker')
print(yaml.dump(running_data, default_flow_style=False))
return
else:
print(Control('dispatcher').status())
return
if options.get('schedule'):
print(Control('dispatcher').schedule())
if flag_enabled('FEATURE_NEW_DISPATCHER'):
print('NOT YET IMPLEMENTED')
return
else:
print(Control('dispatcher').schedule())
return
if options.get('running'):
print(Control('dispatcher').running())
return
if flag_enabled('FEATURE_NEW_DISPATCHER'):
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('running')
print(yaml.dump(running_data, default_flow_style=False))
return
else:
print(Control('dispatcher').running())
return
if options.get('reload'):
return Control('dispatcher').control({'control': 'reload'})
if flag_enabled('FEATURE_NEW_DISPATCHER'):
print('NOT YET IMPLEMENTED')
return
else:
return Control('dispatcher').control({'control': 'reload'})
if options.get('cancel'):
cancel_str = options.get('cancel')
try:
cancel_data = yaml.safe_load(cancel_str)
except Exception:
cancel_data = [cancel_str]
if not isinstance(cancel_data, list):
cancel_data = [cancel_str]
print(Control('dispatcher').cancel(cancel_data))
return
if flag_enabled('FEATURE_NEW_DISPATCHER'):
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('running')
print(yaml.dump(running_data, default_flow_style=False))
return
else:
cancel_str = options.get('cancel')
try:
cancel_data = yaml.safe_load(cancel_str)
except Exception:
cancel_data = [cancel_str]
if not isinstance(cancel_data, list):
cancel_data = [cancel_str]
print(Control('dispatcher').cancel(cancel_data))
return

consumer = None
if flag_enabled('FEATURE_NEW_DISPATCHER'):
run_service()
else:
consumer = None

DispatcherMetricsServer().start()
DispatcherMetricsServer().start()

try:
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
consumer.run()
except KeyboardInterrupt:
logger.debug('Terminating Task Dispatcher')
if consumer:
consumer.stop()
try:
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
consumer.run()
except KeyboardInterrupt:
logger.debug('Terminating Task Dispatcher')
if consumer:
consumer.stop()
64 changes: 64 additions & 0 deletions awx/main/utils/db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,74 @@
# Copyright (c) 2017 Ansible by Red Hat
# All Rights Reserved.

import psycopg
from typing import Union
from copy import deepcopy

from awx.settings.application_name import set_application_name

from django.conf import settings
from django.db import DEFAULT_DB_ALIAS, connection
from django.db.backends.postgresql.base import DatabaseWrapper as PsycopgDatabaseWrapper


def set_connection_name(function):
set_application_name(settings.DATABASES, settings.CLUSTER_HOST_ID, function=function)


# Django settings.DATABASES['alias'] dictionary type
dj_db_dict = dict[str, Union[str, int]]


def psycopg_connection_from_django(**kwargs) -> psycopg.Connection:
"Compatibility with dispatcher connection factory, just returns the Django connection"
return connection.connection


def psycopg_kwargs_from_settings_dict(settings_dict: dj_db_dict) -> dict:
"""Return psycopg connection creation kwargs given Django db settings info
:param dict setting_dict: DATABASES in Django settings
:return: kwargs that can be passed to psycopg.connect, or connection classes"""
psycopg_params = PsycopgDatabaseWrapper(settings_dict).get_connection_params().copy()
psycopg_params.pop('cursor_factory', None)
psycopg_params.pop('context', None)
return psycopg_params


def psycopg_conn_string_from_settings_dict(settings_dict: dj_db_dict) -> str:
conn_params = psycopg_kwargs_from_settings_dict(settings_dict)
return psycopg.conninfo.make_conninfo(**conn_params)


def combine_settings_dict(settings_dict1: dj_db_dict, settings_dict2: dj_db_dict, **extra_options) -> dj_db_dict:
"""Given two Django settings dictionaries, combine them and return a new settings_dict"""
settings_dict = deepcopy(settings_dict1)
settings_dict['OPTIONS'] = deepcopy(settings_dict.get('OPTIONS', {}))

# These extra options are used by AWX to set application_name
settings_dict['OPTIONS'].update(extra_options)

# Apply overrides specifically for the listener connection
for k, v in settings_dict2.items():
if k != 'OPTIONS':
settings_dict[k] = v

for k, v in settings_dict2.get('OPTIONS', {}).items():
settings_dict['OPTIONS'][k] = v

return settings_dict


def get_pg_notify_params(alias: str = DEFAULT_DB_ALIAS, **extra_options) -> dict:
pg_notify_overrides = {}
if hasattr(settings, 'PG_NOTIFY_DATABASES'):
pg_notify_overrides = settings.PG_NOTIFY_DATABASES.get(alias, {})
elif hasattr(settings, 'LISTENER_DATABASES'):
pg_notify_overrides = settings.LISTENER_DATABASES.get(alias, {})

settings_dict = combine_settings_dict(settings.DATABASES[alias], pg_notify_overrides, **extra_options)

# Reuse the Django postgres DB backend to create params for the psycopg library
psycopg_params = psycopg_kwargs_from_settings_dict(settings_dict)

return psycopg_params
10 changes: 9 additions & 1 deletion awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,11 @@
},
}

DISPATCHER_SCHEDULE = {}
for task_name, options in CELERYBEAT_SCHEDULE.items():
DISPATCHER_SCHEDULE[task_name] = options
DISPATCHER_SCHEDULE[task_name]['schedule'] = options['schedule'].total_seconds()

# Django Caching Configuration
DJANGO_REDIS_IGNORE_EXCEPTIONS = True
CACHES = {'default': {'BACKEND': 'awx.main.cache.AWXRedisCache', 'LOCATION': 'unix:///var/run/redis/redis.sock?db=1'}}
Expand Down Expand Up @@ -1084,6 +1089,9 @@


# feature flags
FLAGS = {'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}]}
FLAGS = {
'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}],
'FEATURE_NEW_DISPATCHER': [{'condition': 'boolean', 'value': False}],
}

FLAG_SOURCES = ('flags.sources.SettingsFlagsSource',)
1 change: 1 addition & 0 deletions awx/settings/development.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

# this modifies FLAGS set by defaults
FLAGS['FEATURE_INDIRECT_NODE_COUNTING_ENABLED'] = [{'condition': 'boolean', 'value': True}] # noqa
FLAGS['FEATURE_NEW_DISPATCHER'] = [{'condition': 'boolean', 'value': True}] # noqa

# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!=================================
# Disable normal scheduled/triggered task managers (DependencyManager, TaskManager, WorkflowManager).
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements_git.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ git+https://github.com/ansible/ansible-runner.git@devel#egg=ansible-runner
django-ansible-base @ git+https://github.com/ansible/django-ansible-base@devel#egg=django-ansible-base[rest-filters,jwt_consumer,resource-registry,rbac,feature-flags]
awx-plugins-core @ git+https://github.com/ansible/awx-plugins.git@devel#egg=awx-plugins-core[credentials-github-app]
awx_plugins.interfaces @ git+https://github.com/ansible/awx_plugins.interfaces.git
git+https://github.com/ansible/dispatcher.git

0 comments on commit 8bdbfcc

Please sign in to comment.